X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fln%2Fchannelmanager.rs;h=e475cf13854ad7eca231811847c3fbb7dab40511;hb=782eb3658fbc53ceb9c64847692061f2198cb786;hp=fa98e8e9b3fe3a859837c89bbcc468059594b7e5;hpb=ad82c9ea5bf735068c11c8eebdf68ac57f135eec;p=rust-lightning diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index fa98e8e9..e475cf13 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -34,39 +34,39 @@ use bitcoin::secp256k1::Secp256k1; use bitcoin::secp256k1::ecdh::SharedSecret; use bitcoin::{LockTime, secp256k1, Sequence}; -use chain; -use chain::{Confirm, ChannelMonitorUpdateStatus, Watch, BestBlock}; -use chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator, LowerBoundedFeeEstimator}; -use chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateStep, HTLC_FAIL_BACK_BUFFER, CLTV_CLAIM_BUFFER, LATENCY_GRACE_PERIOD_BLOCKS, ANTI_REORG_DELAY, MonitorEvent, CLOSED_CHANNEL_UPDATE_ID}; -use chain::transaction::{OutPoint, TransactionData}; +use crate::chain; +use crate::chain::{Confirm, ChannelMonitorUpdateStatus, Watch, BestBlock}; +use crate::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator, LowerBoundedFeeEstimator}; +use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateStep, HTLC_FAIL_BACK_BUFFER, CLTV_CLAIM_BUFFER, LATENCY_GRACE_PERIOD_BLOCKS, ANTI_REORG_DELAY, MonitorEvent, CLOSED_CHANNEL_UPDATE_ID}; +use crate::chain::transaction::{OutPoint, TransactionData}; // Since this struct is returned in `list_channels` methods, expose it here in case users want to // construct one themselves. -use ln::{inbound_payment, PaymentHash, PaymentPreimage, PaymentSecret}; -use ln::channel::{Channel, ChannelError, ChannelUpdateStatus, UpdateFulfillCommitFetch}; -use ln::features::{ChannelFeatures, ChannelTypeFeatures, InitFeatures, NodeFeatures}; +use crate::ln::{inbound_payment, PaymentHash, PaymentPreimage, PaymentSecret}; +use crate::ln::channel::{Channel, ChannelError, ChannelUpdateStatus, UpdateFulfillCommitFetch}; +use crate::ln::features::{ChannelFeatures, ChannelTypeFeatures, InitFeatures, NodeFeatures}; #[cfg(any(feature = "_test_utils", test))] -use ln::features::InvoiceFeatures; -use routing::router::{PaymentParameters, Route, RouteHop, RoutePath, RouteParameters}; -use ln::msgs; -use ln::onion_utils; -use ln::msgs::{ChannelMessageHandler, DecodeError, LightningError, MAX_VALUE_MSAT}; -use ln::wire::Encode; -use chain::keysinterface::{Sign, KeysInterface, KeysManager, InMemorySigner, Recipient}; -use util::config::{UserConfig, ChannelConfig}; -use util::events::{EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider, ClosureReason, HTLCDestination}; -use util::{byte_utils, events}; -use util::wakers::{Future, Notifier}; -use util::scid_utils::fake_scid; -use util::ser::{BigSize, FixedLengthReader, Readable, ReadableArgs, MaybeReadable, Writeable, Writer, VecWriter}; -use util::logger::{Level, Logger}; -use util::errors::APIError; - -use io; -use prelude::*; +use crate::ln::features::InvoiceFeatures; +use crate::routing::router::{PaymentParameters, Route, RouteHop, RoutePath, RouteParameters}; +use crate::ln::msgs; +use crate::ln::onion_utils; +use crate::ln::msgs::{ChannelMessageHandler, DecodeError, LightningError, MAX_VALUE_MSAT}; +use crate::ln::wire::Encode; +use crate::chain::keysinterface::{Sign, KeysInterface, KeysManager, Recipient}; +use crate::util::config::{UserConfig, ChannelConfig}; +use crate::util::events::{EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider, ClosureReason, HTLCDestination}; +use crate::util::{byte_utils, events}; +use crate::util::wakers::{Future, Notifier}; +use crate::util::scid_utils::fake_scid; +use crate::util::ser::{BigSize, FixedLengthReader, Readable, ReadableArgs, MaybeReadable, Writeable, Writer, VecWriter}; +use crate::util::logger::{Level, Logger}; +use crate::util::errors::APIError; + +use crate::io; +use crate::prelude::*; use core::{cmp, mem}; use core::cell::RefCell; -use io::Read; -use sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard}; +use crate::io::Read; +use crate::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard, FairRwLock}; use core::sync::atomic::{AtomicUsize, Ordering}; use core::time::Duration; use core::ops::Deref; @@ -395,19 +395,6 @@ pub(super) enum RAACommitmentOrder { // Note this is only exposed in cfg(test): pub(super) struct ChannelHolder { pub(super) by_id: HashMap<[u8; 32], Channel>, - /// SCIDs (and outbound SCID aliases) -> `counterparty_node_id`s and `channel_id`s. - /// - /// Outbound SCID aliases are added here once the channel is available for normal use, with - /// SCIDs being added once the funding transaction is confirmed at the channel's required - /// confirmation depth. - pub(super) short_to_chan_info: HashMap, - /// Map from payment hash to the payment data and any HTLCs which are to us and can be - /// failed/claimed by the user. - /// - /// Note that while this is held in the same mutex as the channels themselves, no consistency - /// guarantees are made about the channels given here actually existing anymore by the time you - /// go to read them! - claimable_htlcs: HashMap)>, /// Messages to send to peers - pushed to in the same lock that they are generated in (except /// for broadcast messages, where ordering isn't as strict). pub(super) pending_msg_events: Vec, @@ -473,6 +460,7 @@ pub(crate) enum PendingOutboundPayment { Fulfilled { session_privs: HashSet<[u8; 32]>, payment_hash: Option, + timer_ticks_without_htlcs: u8, }, /// When a payer gives up trying to retry a payment, they inform us, letting us generate a /// `PaymentFailed` event when all HTLCs have irrevocably failed. This avoids a number of race @@ -488,12 +476,6 @@ pub(crate) enum PendingOutboundPayment { } impl PendingOutboundPayment { - fn is_retryable(&self) -> bool { - match self { - PendingOutboundPayment::Retryable { .. } => true, - _ => false, - } - } fn is_fulfilled(&self) -> bool { match self { PendingOutboundPayment::Fulfilled { .. } => true, @@ -532,7 +514,7 @@ impl PendingOutboundPayment { => session_privs, }); let payment_hash = self.payment_hash(); - *self = PendingOutboundPayment::Fulfilled { session_privs, payment_hash }; + *self = PendingOutboundPayment::Fulfilled { session_privs, payment_hash, timer_ticks_without_htlcs: 0 }; } fn mark_abandoned(&mut self) -> Result<(), ()> { @@ -617,7 +599,7 @@ impl PendingOutboundPayment { /// concrete type of the KeysManager. /// /// (C-not exported) as Arcs don't make sense in bindings -pub type SimpleArcChannelManager = ChannelManager, Arc, Arc, Arc, Arc>; +pub type SimpleArcChannelManager = ChannelManager, Arc, Arc, Arc, Arc>; /// SimpleRefChannelManager is a type alias for a ChannelManager reference, and is the reference /// counterpart to the SimpleArcChannelManager type alias. Use this type by default when you don't @@ -629,7 +611,7 @@ pub type SimpleArcChannelManager = ChannelManager = ChannelManager; +pub type SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, M, T, F, L> = ChannelManager<&'a M, &'b T, &'c KeysManager, &'d F, &'e L>; /// Manager which keeps track of a number of channels and sends messages to the appropriate /// channel, also tracking HTLC preimages and forwarding onion packets appropriately. @@ -681,17 +663,21 @@ pub type SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, M, T, F, L> = ChannelManage // | // |__`forward_htlcs` // | -// |__`channel_state` +// |__`pending_inbound_payments` // | | -// | |__`id_to_peer` +// | |__`claimable_htlcs` // | | -// | |__`per_peer_state` -// | | -// | |__`outbound_scid_aliases` +// | |__`pending_outbound_payments` // | | -// | |__`pending_inbound_payments` +// | |__`channel_state` // | | -// | |__`pending_outbound_payments` +// | |__`id_to_peer` +// | | +// | |__`short_to_chan_info` +// | | +// | |__`per_peer_state` +// | | +// | |__`outbound_scid_aliases` // | | // | |__`best_block` // | | @@ -699,10 +685,10 @@ pub type SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, M, T, F, L> = ChannelManage // | | // | |__`pending_background_events` // -pub struct ChannelManager - where M::Target: chain::Watch, +pub struct ChannelManager + where M::Target: chain::Watch<::Signer>, T::Target: BroadcasterInterface, - K::Target: KeysInterface, + K::Target: KeysInterface, F::Target: FeeEstimator, L::Target: Logger, { @@ -721,9 +707,9 @@ pub struct ChannelManager>, + pub(super) channel_state: Mutex::Signer>>, #[cfg(not(any(test, feature = "_test_utils")))] - channel_state: Mutex>, + channel_state: Mutex::Signer>>, /// Storage for PaymentSecrets and any requirements on future inbound payments before we will /// expose them to users via a PaymentReceived event. HTLCs which do not meet the requirements @@ -762,6 +748,15 @@ pub struct ChannelManager>>, + /// Map from payment hash to the payment data and any HTLCs which are to us and can be + /// failed/claimed by the user. + /// + /// Note that, no consistency guarantees are made about the channels given here actually + /// existing anymore by the time you go to read them! + /// + /// See `ChannelManager` struct-level documentation for lock order requirements. + claimable_htlcs: Mutex)>>, + /// The set of outbound SCID aliases across all our channels, including unconfirmed channels /// and some closed channels which reached a usable state prior to being closed. This is used /// only to avoid duplicates, and is not persisted explicitly to disk, but rebuilt from the @@ -791,6 +786,22 @@ pub struct ChannelManager>, + /// SCIDs (and outbound SCID aliases) -> `counterparty_node_id`s and `channel_id`s. + /// + /// Outbound SCID aliases are added here once the channel is available for normal use, with + /// SCIDs being added once the funding transaction is confirmed at the channel's required + /// confirmation depth. + /// + /// Note that while this holds `counterparty_node_id`s and `channel_id`s, no consistency + /// guarantees are made about the existence of a peer with the `counterparty_node_id` nor a + /// channel with the `channel_id` in our other maps. + /// + /// See `ChannelManager` struct-level documentation for lock order requirements. + #[cfg(test)] + pub(super) short_to_chan_info: FairRwLock>, + #[cfg(not(test))] + short_to_chan_info: FairRwLock>, + our_network_key: SecretKey, our_network_pubkey: PublicKey, @@ -959,13 +970,14 @@ const CHECK_CLTV_EXPIRY_SANITY: u32 = MIN_CLTV_EXPIRY_DELTA as u32 - LATENCY_GRA #[allow(dead_code)] const CHECK_CLTV_EXPIRY_SANITY_2: u32 = MIN_CLTV_EXPIRY_DELTA as u32 - LATENCY_GRACE_PERIOD_BLOCKS - 2*CLTV_CLAIM_BUFFER; -/// The number of blocks before we consider an outbound payment for expiry if it doesn't have any -/// pending HTLCs in flight. -pub(crate) const PAYMENT_EXPIRY_BLOCKS: u32 = 3; - /// The number of ticks of [`ChannelManager::timer_tick_occurred`] until expiry of incomplete MPPs pub(crate) const MPP_TIMEOUT_TICKS: u8 = 3; +/// The number of ticks of [`ChannelManager::timer_tick_occurred`] until we time-out the +/// idempotency of payments by [`PaymentId`]. See +/// [`ChannelManager::remove_stale_resolved_payments`]. +pub(crate) const IDEMPOTENCY_TIMEOUT_TICKS: u8 = 7; + /// Information needed for constructing an invoice route hint for this channel. #[derive(Clone, Debug, PartialEq)] pub struct CounterpartyForwardingInfo { @@ -1206,6 +1218,9 @@ pub enum PaymentSendFailure { /// All paths which were attempted failed to send, with no channel state change taking place. /// You can freely retry the payment in full (though you probably want to do so over different /// paths than the ones selected). + /// + /// [`ChannelManager::abandon_payment`] does *not* need to be called for this payment and + /// [`ChannelManager::retry_payment`] will *not* work for this payment. AllFailedRetrySafe(Vec), /// Some paths which were attempted failed to send, though possibly not all. At least some /// paths have irrevocably committed to the HTLC and retrying the payment in full would result @@ -1296,9 +1311,11 @@ macro_rules! handle_error { } macro_rules! update_maps_on_chan_removal { - ($self: expr, $short_to_chan_info: expr, $channel: expr) => { + ($self: expr, $channel: expr) => {{ + $self.id_to_peer.lock().unwrap().remove(&$channel.channel_id()); + let mut short_to_chan_info = $self.short_to_chan_info.write().unwrap(); if let Some(short_id) = $channel.get_short_channel_id() { - $short_to_chan_info.remove(&short_id); + short_to_chan_info.remove(&short_id); } else { // If the channel was never confirmed on-chain prior to its closure, remove the // outbound SCID alias we used for it from the collision-prevention set. While we @@ -1309,14 +1326,13 @@ macro_rules! update_maps_on_chan_removal { let alias_removed = $self.outbound_scid_aliases.lock().unwrap().remove(&$channel.outbound_scid_alias()); debug_assert!(alias_removed); } - $self.id_to_peer.lock().unwrap().remove(&$channel.channel_id()); - $short_to_chan_info.remove(&$channel.outbound_scid_alias()); - } + short_to_chan_info.remove(&$channel.outbound_scid_alias()); + }} } /// Returns (boolean indicating if we should remove the Channel object from memory, a mapped error) macro_rules! convert_chan_err { - ($self: ident, $err: expr, $short_to_chan_info: expr, $channel: expr, $channel_id: expr) => { + ($self: ident, $err: expr, $channel: expr, $channel_id: expr) => { match $err { ChannelError::Warn(msg) => { (false, MsgHandleErrInternal::from_chan_no_close(ChannelError::Warn(msg), $channel_id.clone())) @@ -1326,7 +1342,7 @@ macro_rules! convert_chan_err { }, ChannelError::Close(msg) => { log_error!($self.logger, "Closing channel {} due to close-required error: {}", log_bytes!($channel_id[..]), msg); - update_maps_on_chan_removal!($self, $short_to_chan_info, $channel); + update_maps_on_chan_removal!($self, $channel); let shutdown_res = $channel.force_shutdown(true); (true, MsgHandleErrInternal::from_finish_shutdown(msg, *$channel_id, $channel.get_user_id(), shutdown_res, $self.get_channel_update_for_broadcast(&$channel).ok())) @@ -1336,11 +1352,11 @@ macro_rules! convert_chan_err { } macro_rules! break_chan_entry { - ($self: ident, $res: expr, $channel_state: expr, $entry: expr) => { + ($self: ident, $res: expr, $entry: expr) => { match $res { Ok(res) => res, Err(e) => { - let (drop, res) = convert_chan_err!($self, e, $channel_state.short_to_chan_info, $entry.get_mut(), $entry.key()); + let (drop, res) = convert_chan_err!($self, e, $entry.get_mut(), $entry.key()); if drop { $entry.remove_entry(); } @@ -1351,11 +1367,11 @@ macro_rules! break_chan_entry { } macro_rules! try_chan_entry { - ($self: ident, $res: expr, $channel_state: expr, $entry: expr) => { + ($self: ident, $res: expr, $entry: expr) => { match $res { Ok(res) => res, Err(e) => { - let (drop, res) = convert_chan_err!($self, e, $channel_state.short_to_chan_info, $entry.get_mut(), $entry.key()); + let (drop, res) = convert_chan_err!($self, e, $entry.get_mut(), $entry.key()); if drop { $entry.remove_entry(); } @@ -1366,21 +1382,21 @@ macro_rules! try_chan_entry { } macro_rules! remove_channel { - ($self: expr, $channel_state: expr, $entry: expr) => { + ($self: expr, $entry: expr) => { { let channel = $entry.remove_entry().1; - update_maps_on_chan_removal!($self, $channel_state.short_to_chan_info, channel); + update_maps_on_chan_removal!($self, channel); channel } } } macro_rules! handle_monitor_update_res { - ($self: ident, $err: expr, $short_to_chan_info: expr, $chan: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr, $resend_channel_ready: expr, $failed_forwards: expr, $failed_fails: expr, $failed_finalized_fulfills: expr, $chan_id: expr) => { + ($self: ident, $err: expr, $chan: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr, $resend_channel_ready: expr, $failed_forwards: expr, $failed_fails: expr, $failed_finalized_fulfills: expr, $chan_id: expr) => { match $err { ChannelMonitorUpdateStatus::PermanentFailure => { log_error!($self.logger, "Closing channel {} due to monitor update ChannelMonitorUpdateStatus::PermanentFailure", log_bytes!($chan_id[..])); - update_maps_on_chan_removal!($self, $short_to_chan_info, $chan); + update_maps_on_chan_removal!($self, $chan); // TODO: $failed_fails is dropped here, which will cause other channels to hit the // chain in a confused state! We need to move them into the ChannelMonitor which // will be responsible for failing backwards once things confirm on-chain. @@ -1422,47 +1438,65 @@ macro_rules! handle_monitor_update_res { }, } }; - ($self: ident, $err: expr, $channel_state: expr, $entry: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr, $resend_channel_ready: expr, $failed_forwards: expr, $failed_fails: expr, $failed_finalized_fulfills: expr) => { { - let (res, drop) = handle_monitor_update_res!($self, $err, $channel_state.short_to_chan_info, $entry.get_mut(), $action_type, $resend_raa, $resend_commitment, $resend_channel_ready, $failed_forwards, $failed_fails, $failed_finalized_fulfills, $entry.key()); + ($self: ident, $err: expr, $entry: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr, $resend_channel_ready: expr, $failed_forwards: expr, $failed_fails: expr, $failed_finalized_fulfills: expr) => { { + let (res, drop) = handle_monitor_update_res!($self, $err, $entry.get_mut(), $action_type, $resend_raa, $resend_commitment, $resend_channel_ready, $failed_forwards, $failed_fails, $failed_finalized_fulfills, $entry.key()); if drop { $entry.remove_entry(); } res } }; - ($self: ident, $err: expr, $channel_state: expr, $entry: expr, $action_type: path, $chan_id: expr, COMMITMENT_UPDATE_ONLY) => { { + ($self: ident, $err: expr, $entry: expr, $action_type: path, $chan_id: expr, COMMITMENT_UPDATE_ONLY) => { { debug_assert!($action_type == RAACommitmentOrder::CommitmentFirst); - handle_monitor_update_res!($self, $err, $channel_state, $entry, $action_type, false, true, false, Vec::new(), Vec::new(), Vec::new(), $chan_id) + handle_monitor_update_res!($self, $err, $entry, $action_type, false, true, false, Vec::new(), Vec::new(), Vec::new(), $chan_id) } }; - ($self: ident, $err: expr, $channel_state: expr, $entry: expr, $action_type: path, $chan_id: expr, NO_UPDATE) => { - handle_monitor_update_res!($self, $err, $channel_state, $entry, $action_type, false, false, false, Vec::new(), Vec::new(), Vec::new(), $chan_id) + ($self: ident, $err: expr, $entry: expr, $action_type: path, $chan_id: expr, NO_UPDATE) => { + handle_monitor_update_res!($self, $err, $entry, $action_type, false, false, false, Vec::new(), Vec::new(), Vec::new(), $chan_id) }; - ($self: ident, $err: expr, $channel_state: expr, $entry: expr, $action_type: path, $resend_channel_ready: expr, OPTIONALLY_RESEND_FUNDING_LOCKED) => { - handle_monitor_update_res!($self, $err, $channel_state, $entry, $action_type, false, false, $resend_channel_ready, Vec::new(), Vec::new(), Vec::new()) + ($self: ident, $err: expr, $entry: expr, $action_type: path, $resend_channel_ready: expr, OPTIONALLY_RESEND_FUNDING_LOCKED) => { + handle_monitor_update_res!($self, $err, $entry, $action_type, false, false, $resend_channel_ready, Vec::new(), Vec::new(), Vec::new()) }; - ($self: ident, $err: expr, $channel_state: expr, $entry: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr) => { - handle_monitor_update_res!($self, $err, $channel_state, $entry, $action_type, $resend_raa, $resend_commitment, false, Vec::new(), Vec::new(), Vec::new()) + ($self: ident, $err: expr, $entry: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr) => { + handle_monitor_update_res!($self, $err, $entry, $action_type, $resend_raa, $resend_commitment, false, Vec::new(), Vec::new(), Vec::new()) }; - ($self: ident, $err: expr, $channel_state: expr, $entry: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr, $failed_forwards: expr, $failed_fails: expr) => { - handle_monitor_update_res!($self, $err, $channel_state, $entry, $action_type, $resend_raa, $resend_commitment, false, $failed_forwards, $failed_fails, Vec::new()) + ($self: ident, $err: expr, $entry: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr, $failed_forwards: expr, $failed_fails: expr) => { + handle_monitor_update_res!($self, $err, $entry, $action_type, $resend_raa, $resend_commitment, false, $failed_forwards, $failed_fails, Vec::new()) }; } macro_rules! send_channel_ready { - ($short_to_chan_info: expr, $pending_msg_events: expr, $channel: expr, $channel_ready_msg: expr) => { + ($self: ident, $pending_msg_events: expr, $channel: expr, $channel_ready_msg: expr) => {{ $pending_msg_events.push(events::MessageSendEvent::SendChannelReady { node_id: $channel.get_counterparty_node_id(), msg: $channel_ready_msg, }); // Note that we may send a `channel_ready` multiple times for a channel if we reconnect, so // we allow collisions, but we shouldn't ever be updating the channel ID pointed to. - let outbound_alias_insert = $short_to_chan_info.insert($channel.outbound_scid_alias(), ($channel.get_counterparty_node_id(), $channel.channel_id())); + let mut short_to_chan_info = $self.short_to_chan_info.write().unwrap(); + let outbound_alias_insert = short_to_chan_info.insert($channel.outbound_scid_alias(), ($channel.get_counterparty_node_id(), $channel.channel_id())); assert!(outbound_alias_insert.is_none() || outbound_alias_insert.unwrap() == ($channel.get_counterparty_node_id(), $channel.channel_id()), "SCIDs should never collide - ensure you weren't behind the chain tip by a full month when creating channels"); if let Some(real_scid) = $channel.get_short_channel_id() { - let scid_insert = $short_to_chan_info.insert(real_scid, ($channel.get_counterparty_node_id(), $channel.channel_id())); + let scid_insert = short_to_chan_info.insert(real_scid, ($channel.get_counterparty_node_id(), $channel.channel_id())); assert!(scid_insert.is_none() || scid_insert.unwrap() == ($channel.get_counterparty_node_id(), $channel.channel_id()), "SCIDs should never collide - ensure you weren't behind the chain tip by a full month when creating channels"); } + }} +} + +macro_rules! emit_channel_ready_event { + ($self: expr, $channel: expr) => { + if $channel.should_emit_channel_ready_event() { + { + let mut pending_events = $self.pending_events.lock().unwrap(); + pending_events.push(events::Event::ChannelReady { + channel_id: $channel.channel_id(), + user_channel_id: $channel.get_user_id(), + counterparty_node_id: $channel.get_counterparty_node_id(), + channel_type: $channel.get_channel_type().clone(), + }); + } + $channel.set_channel_ready_event_emitted(); + } } } @@ -1500,7 +1534,7 @@ macro_rules! handle_chan_restoration_locked { // Similar to the above, this implies that we're letting the channel_ready fly // before it should be allowed to. assert!(chanmon_update.is_none()); - send_channel_ready!($channel_state.short_to_chan_info, $channel_state.pending_msg_events, $channel_entry.get(), msg); + send_channel_ready!($self, $channel_state.pending_msg_events, $channel_entry.get(), msg); } if let Some(msg) = $announcement_sigs { $channel_state.pending_msg_events.push(events::MessageSendEvent::SendAnnouncementSignatures { @@ -1509,6 +1543,8 @@ macro_rules! handle_chan_restoration_locked { }); } + emit_channel_ready_event!($self, $channel_entry.get_mut()); + let funding_broadcastable: Option = $funding_broadcastable; // Force type-checking to resolve if let Some(monitor_update) = chanmon_update { // We only ever broadcast a funding transaction in response to a funding_signed @@ -1531,7 +1567,7 @@ macro_rules! handle_chan_restoration_locked { if $raa.is_none() { order = RAACommitmentOrder::CommitmentFirst; } - break handle_monitor_update_res!($self, e, $channel_state, $channel_entry, order, $raa.is_some(), true); + break handle_monitor_update_res!($self, e, $channel_entry, order, $raa.is_some(), true); } } } @@ -1592,10 +1628,10 @@ macro_rules! post_handle_chan_restoration { } } } -impl ChannelManager - where M::Target: chain::Watch, +impl ChannelManager + where M::Target: chain::Watch<::Signer>, T::Target: BroadcasterInterface, - K::Target: KeysInterface, + K::Target: KeysInterface, F::Target: FeeEstimator, L::Target: Logger, { @@ -1625,15 +1661,15 @@ impl ChannelMana channel_state: Mutex::new(ChannelHolder{ by_id: HashMap::new(), - short_to_chan_info: HashMap::new(), - claimable_htlcs: HashMap::new(), pending_msg_events: Vec::new(), }), outbound_scid_aliases: Mutex::new(HashSet::new()), pending_inbound_payments: Mutex::new(HashMap::new()), pending_outbound_payments: Mutex::new(HashMap::new()), forward_htlcs: Mutex::new(HashMap::new()), + claimable_htlcs: Mutex::new(HashMap::new()), id_to_peer: Mutex::new(HashMap::new()), + short_to_chan_info: FairRwLock::new(HashMap::new()), our_network_key: keys_manager.get_node_secret(Recipient::Node).unwrap(), our_network_pubkey: PublicKey::from_secret_key(&secp_ctx, &keys_manager.get_node_secret(Recipient::Node).unwrap()), @@ -1761,7 +1797,7 @@ impl ChannelMana Ok(temporary_channel_id) } - fn list_channels_with_filter)) -> bool>(&self, f: Fn) -> Vec { + fn list_channels_with_filter::Signer>)) -> bool>(&self, f: Fn) -> Vec { let mut res = Vec::new(); { let channel_state = self.channel_state.lock().unwrap(); @@ -1843,7 +1879,7 @@ impl ChannelMana } /// Helper function that issues the channel close events - fn issue_channel_close_events(&self, channel: &Channel, closure_reason: ClosureReason) { + fn issue_channel_close_events(&self, channel: &Channel<::Signer>, closure_reason: ClosureReason) { let mut pending_events_lock = self.pending_events.lock().unwrap(); match channel.unbroadcasted_funding() { Some(transaction) => { @@ -1870,14 +1906,16 @@ impl ChannelMana if *counterparty_node_id != chan_entry.get().get_counterparty_node_id(){ return Err(APIError::APIMisuseError { err: "The passed counterparty_node_id doesn't match the channel's counterparty node_id".to_owned() }); } - let per_peer_state = self.per_peer_state.read().unwrap(); - let (shutdown_msg, monitor_update, htlcs) = match per_peer_state.get(&counterparty_node_id) { - Some(peer_state) => { - let peer_state = peer_state.lock().unwrap(); - let their_features = &peer_state.latest_features; - chan_entry.get_mut().get_shutdown(&self.keys_manager, their_features, target_feerate_sats_per_1000_weight)? - }, - None => return Err(APIError::ChannelUnavailable { err: format!("Not connected to node: {}", counterparty_node_id) }), + let (shutdown_msg, monitor_update, htlcs) = { + let per_peer_state = self.per_peer_state.read().unwrap(); + match per_peer_state.get(&counterparty_node_id) { + Some(peer_state) => { + let peer_state = peer_state.lock().unwrap(); + let their_features = &peer_state.latest_features; + chan_entry.get_mut().get_shutdown(&self.keys_manager, their_features, target_feerate_sats_per_1000_weight)? + }, + None => return Err(APIError::ChannelUnavailable { err: format!("Not connected to node: {}", counterparty_node_id) }), + } }; failed_htlcs = htlcs; @@ -1885,9 +1923,9 @@ impl ChannelMana if let Some(monitor_update) = monitor_update { let update_res = self.chain_monitor.update_channel(chan_entry.get().get_funding_txo().unwrap(), monitor_update); let (result, is_permanent) = - handle_monitor_update_res!(self, update_res, channel_state.short_to_chan_info, chan_entry.get_mut(), RAACommitmentOrder::CommitmentFirst, chan_entry.key(), NO_UPDATE); + handle_monitor_update_res!(self, update_res, chan_entry.get_mut(), RAACommitmentOrder::CommitmentFirst, chan_entry.key(), NO_UPDATE); if is_permanent { - remove_channel!(self, channel_state, chan_entry); + remove_channel!(self, chan_entry); break result; } } @@ -1898,7 +1936,7 @@ impl ChannelMana }); if chan_entry.get().is_shutdown() { - let channel = remove_channel!(self, channel_state, chan_entry); + let channel = remove_channel!(self, chan_entry); if let Ok(channel_update) = self.get_channel_update_for_broadcast(&channel) { channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { msg: channel_update @@ -1999,7 +2037,7 @@ impl ChannelMana } else { self.issue_channel_close_events(chan.get(),ClosureReason::HolderForceClosed); } - remove_channel!(self, channel_state, chan) + remove_channel!(self, chan) } else { return Err(APIError::ChannelUnavailable{err: "No such channel".to_owned()}); } @@ -2270,8 +2308,8 @@ impl ChannelMana // short_channel_id is non-0 in any ::Forward. if let &PendingHTLCRouting::Forward { ref short_channel_id, .. } = routing { if let Some((err, code, chan_update)) = loop { + let id_option = self.short_to_chan_info.read().unwrap().get(&short_channel_id).cloned(); let mut channel_state = self.channel_state.lock().unwrap(); - let id_option = channel_state.short_to_chan_info.get(&short_channel_id).cloned(); let forwarding_id_opt = match id_option { None => { // unknown_next_peer // Note that this is likely a timing oracle for detecting whether an scid is a @@ -2285,7 +2323,14 @@ impl ChannelMana Some((_cp_id, chan_id)) => Some(chan_id.clone()), }; let chan_update_opt = if let Some(forwarding_id) = forwarding_id_opt { - let chan = channel_state.by_id.get_mut(&forwarding_id).unwrap(); + let chan = match channel_state.by_id.get_mut(&forwarding_id) { + None => { + // Channel was removed. The short_to_chan_info and by_id maps have + // no consistency guarantees. + break Some(("Don't have available channel for forwarding as requested.", 0x4000 | 10, None)); + }, + Some(chan) => chan + }; if !chan.should_announce() && !self.default_configuration.accept_forwards_to_priv_channels { // Note that the behavior here should be identical to the above block - we // should NOT reveal the existence or non-existence of a private channel if @@ -2379,7 +2424,7 @@ impl ChannelMana /// [`MessageSendEvent::BroadcastChannelUpdate`] event. /// /// May be called with channel_state already locked! - fn get_channel_update_for_broadcast(&self, chan: &Channel) -> Result { + fn get_channel_update_for_broadcast(&self, chan: &Channel<::Signer>) -> Result { if !chan.should_announce() { return Err(LightningError { err: "Cannot broadcast a channel_update for a private channel".to_owned(), @@ -2398,7 +2443,7 @@ impl ChannelMana /// and thus MUST NOT be called unless the recipient of the resulting message has already /// provided evidence that they know about the existence of the channel. /// May be called with channel_state already locked! - fn get_channel_update_for_unicast(&self, chan: &Channel) -> Result { + fn get_channel_update_for_unicast(&self, chan: &Channel<::Signer>) -> Result { log_trace!(self.logger, "Attempting to generate channel update for channel {}", log_bytes!(chan.channel_id())); let short_channel_id = match chan.get_short_channel_id().or(chan.latest_inbound_scid_alias()) { None => return Err(LightningError{err: "Channel not yet established".to_owned(), action: msgs::ErrorAction::IgnoreError}), @@ -2407,7 +2452,7 @@ impl ChannelMana self.get_channel_update_for_onion(short_channel_id, chan) } - fn get_channel_update_for_onion(&self, short_channel_id: u64, chan: &Channel) -> Result { + fn get_channel_update_for_onion(&self, short_channel_id: u64, chan: &Channel<::Signer>) -> Result { log_trace!(self.logger, "Generating channel update for channel {}", log_bytes!(chan.channel_id())); let were_node_one = PublicKey::from_secret_key(&self.secp_ctx, &self.our_network_key).serialize()[..] < chan.get_counterparty_node_id().serialize()[..]; @@ -2434,10 +2479,9 @@ impl ChannelMana } // Only public for testing, this should otherwise never be called direcly - pub(crate) fn send_payment_along_path(&self, path: &Vec, payment_params: &Option, payment_hash: &PaymentHash, payment_secret: &Option, total_value: u64, cur_height: u32, payment_id: PaymentId, keysend_preimage: &Option) -> Result<(), APIError> { + pub(crate) fn send_payment_along_path(&self, path: &Vec, payment_params: &Option, payment_hash: &PaymentHash, payment_secret: &Option, total_value: u64, cur_height: u32, payment_id: PaymentId, keysend_preimage: &Option, session_priv_bytes: [u8; 32]) -> Result<(), APIError> { log_trace!(self.logger, "Attempting to send payment for path with next hop {}", path.first().unwrap().short_channel_id); let prng_seed = self.keys_manager.get_secure_random_bytes(); - let session_priv_bytes = self.keys_manager.get_secure_random_bytes(); let session_priv = SecretKey::from_slice(&session_priv_bytes[..]).expect("RNG is busted"); let onion_keys = onion_utils::construct_onion_keys(&self.secp_ctx, &path, &session_priv) @@ -2451,38 +2495,12 @@ impl ChannelMana let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); let err: Result<(), _> = loop { - let mut channel_lock = self.channel_state.lock().unwrap(); - - let mut pending_outbounds = self.pending_outbound_payments.lock().unwrap(); - let payment_entry = pending_outbounds.entry(payment_id); - if let hash_map::Entry::Occupied(payment) = &payment_entry { - if !payment.get().is_retryable() { - return Err(APIError::RouteError { - err: "Payment already completed" - }); - } - } - - let id = match channel_lock.short_to_chan_info.get(&path.first().unwrap().short_channel_id) { + let id = match self.short_to_chan_info.read().unwrap().get(&path.first().unwrap().short_channel_id) { None => return Err(APIError::ChannelUnavailable{err: "No channel available with first hop!".to_owned()}), Some((_cp_id, chan_id)) => chan_id.clone(), }; - macro_rules! insert_outbound_payment { - () => { - let payment = payment_entry.or_insert_with(|| PendingOutboundPayment::Retryable { - session_privs: HashSet::new(), - pending_amt_msat: 0, - pending_fee_msat: Some(0), - payment_hash: *payment_hash, - payment_secret: *payment_secret, - starting_block_height: self.best_block.read().unwrap().height(), - total_msat: total_value, - }); - assert!(payment.insert(session_priv_bytes, path)); - } - } - + let mut channel_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_lock; if let hash_map::Entry::Occupied(mut chan) = channel_state.by_id.entry(id) { match { @@ -2501,19 +2519,17 @@ impl ChannelMana payment_secret: payment_secret.clone(), payment_params: payment_params.clone(), }, onion_packet, &self.logger), - channel_state, chan) + chan) } { Some((update_add, commitment_signed, monitor_update)) => { let update_err = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), monitor_update); let chan_id = chan.get().channel_id(); match (update_err, - handle_monitor_update_res!(self, update_err, channel_state, chan, + handle_monitor_update_res!(self, update_err, chan, RAACommitmentOrder::CommitmentFirst, false, true)) { (ChannelMonitorUpdateStatus::PermanentFailure, Err(e)) => break Err(e), - (ChannelMonitorUpdateStatus::Completed, Ok(())) => { - insert_outbound_payment!(); - }, + (ChannelMonitorUpdateStatus::Completed, Ok(())) => {}, (ChannelMonitorUpdateStatus::InProgress, Err(_)) => { // Note that MonitorUpdateInProgress here indicates (per function // docs) that we will resend the commitment update once monitor @@ -2521,7 +2537,6 @@ impl ChannelMana // indicating that it is unsafe to retry the payment wholesale, // which we do in the send_payment check for // MonitorUpdateInProgress, below. - insert_outbound_payment!(); // Only do this after possibly break'ing on Perm failure above. return Err(APIError::MonitorUpdateInProgress); }, _ => unreachable!(), @@ -2540,9 +2555,14 @@ impl ChannelMana }, }); }, - None => { insert_outbound_payment!(); }, + None => { }, } - } else { unreachable!(); } + } else { + // The channel was likely removed after we fetched the id from the + // `short_to_chan_info` map, but before we successfully locked the `by_id` map. + // This can occur as no consistency guarantees exists between the two maps. + return Err(APIError::ChannelUnavailable{err: "No channel available with first hop!".to_owned()}); + } return Ok(()); }; @@ -2559,14 +2579,20 @@ impl ChannelMana /// Value parameters are provided via the last hop in route, see documentation for RouteHop /// fields for more info. /// - /// Note that if the payment_hash already exists elsewhere (eg you're sending a duplicative - /// payment), we don't do anything to stop you! We always try to ensure that if the provided - /// next hop knows the preimage to payment_hash they can claim an additional amount as - /// specified in the last hop in the route! Thus, you should probably do your own - /// payment_preimage tracking (which you should already be doing as they represent "proof of - /// payment") and prevent double-sends yourself. + /// If a pending payment is currently in-flight with the same [`PaymentId`] provided, this + /// method will error with an [`APIError::RouteError`]. Note, however, that once a payment + /// is no longer pending (either via [`ChannelManager::abandon_payment`], or handling of an + /// [`Event::PaymentSent`]) LDK will not stop you from sending a second payment with the same + /// [`PaymentId`]. + /// + /// Thus, in order to ensure duplicate payments are not sent, you should implement your own + /// tracking of payments, including state to indicate once a payment has completed. Because you + /// should also ensure that [`PaymentHash`]es are not re-used, for simplicity, you should + /// consider using the [`PaymentHash`] as the key for tracking payments. In that case, the + /// [`PaymentId`] should be a copy of the [`PaymentHash`] bytes. /// - /// May generate SendHTLCs message(s) event on success, which should be relayed. + /// May generate SendHTLCs message(s) event on success, which should be relayed (e.g. via + /// [`PeerManager::process_events`]). /// /// Each path may have a different return value, and PaymentSendValue may return a Vec with /// each entry matching the corresponding-index entry in the route paths, see @@ -2590,14 +2616,55 @@ impl ChannelMana /// newer nodes, it will be provided to you in the invoice. If you do not have one, the Route /// must not contain multiple paths as multi-path payments require a recipient-provided /// payment_secret. + /// /// If a payment_secret *is* provided, we assume that the invoice had the payment_secret feature /// bit set (either as required or as available). If multiple paths are present in the Route, /// we assume the invoice had the basic_mpp feature set. - pub fn send_payment(&self, route: &Route, payment_hash: PaymentHash, payment_secret: &Option) -> Result { - self.send_payment_internal(route, payment_hash, payment_secret, None, None, None) + /// + /// [`Event::PaymentSent`]: events::Event::PaymentSent + /// [`PeerManager::process_events`]: crate::ln::peer_handler::PeerManager::process_events + pub fn send_payment(&self, route: &Route, payment_hash: PaymentHash, payment_secret: &Option, payment_id: PaymentId) -> Result<(), PaymentSendFailure> { + let onion_session_privs = self.add_new_pending_payment(payment_hash, *payment_secret, payment_id, route)?; + self.send_payment_internal(route, payment_hash, payment_secret, None, payment_id, None, onion_session_privs) + } + + #[cfg(test)] + pub(crate) fn test_add_new_pending_payment(&self, payment_hash: PaymentHash, payment_secret: Option, payment_id: PaymentId, route: &Route) -> Result, PaymentSendFailure> { + self.add_new_pending_payment(payment_hash, payment_secret, payment_id, route) + } + + fn add_new_pending_payment(&self, payment_hash: PaymentHash, payment_secret: Option, payment_id: PaymentId, route: &Route) -> Result, PaymentSendFailure> { + let mut onion_session_privs = Vec::with_capacity(route.paths.len()); + for _ in 0..route.paths.len() { + onion_session_privs.push(self.keys_manager.get_secure_random_bytes()); + } + + let mut pending_outbounds = self.pending_outbound_payments.lock().unwrap(); + match pending_outbounds.entry(payment_id) { + hash_map::Entry::Occupied(_) => Err(PaymentSendFailure::ParameterError(APIError::RouteError { + err: "Payment already in progress" + })), + hash_map::Entry::Vacant(entry) => { + let payment = entry.insert(PendingOutboundPayment::Retryable { + session_privs: HashSet::new(), + pending_amt_msat: 0, + pending_fee_msat: Some(0), + payment_hash, + payment_secret, + starting_block_height: self.best_block.read().unwrap().height(), + total_msat: route.get_total_amount(), + }); + + for (path, session_priv_bytes) in route.paths.iter().zip(onion_session_privs.iter()) { + assert!(payment.insert(*session_priv_bytes, path)); + } + + Ok(onion_session_privs) + }, + } } - fn send_payment_internal(&self, route: &Route, payment_hash: PaymentHash, payment_secret: &Option, keysend_preimage: Option, payment_id: Option, recv_value_msat: Option) -> Result { + fn send_payment_internal(&self, route: &Route, payment_hash: PaymentHash, payment_secret: &Option, keysend_preimage: Option, payment_id: PaymentId, recv_value_msat: Option, onion_session_privs: Vec<[u8; 32]>) -> Result<(), PaymentSendFailure> { if route.paths.len() < 1 { return Err(PaymentSendFailure::ParameterError(APIError::RouteError{err: "There must be at least one path to send over"})); } @@ -2607,7 +2674,6 @@ impl ChannelMana let mut total_value = 0; let our_node_id = self.get_our_node_id(); let mut path_errs = Vec::with_capacity(route.paths.len()); - let payment_id = if let Some(id) = payment_id { id } else { PaymentId(self.keys_manager.get_secure_random_bytes()) }; 'path_check: for path in route.paths.iter() { if path.len() < 1 || path.len() > 20 { path_errs.push(Err(APIError::RouteError{err: "Path didn't go anywhere/had bogus size"})); @@ -2632,8 +2698,28 @@ impl ChannelMana let cur_height = self.best_block.read().unwrap().height() + 1; let mut results = Vec::new(); - for path in route.paths.iter() { - results.push(self.send_payment_along_path(&path, &route.payment_params, &payment_hash, payment_secret, total_value, cur_height, payment_id, &keysend_preimage)); + debug_assert_eq!(route.paths.len(), onion_session_privs.len()); + for (path, session_priv) in route.paths.iter().zip(onion_session_privs.into_iter()) { + let mut path_res = self.send_payment_along_path(&path, &route.payment_params, &payment_hash, payment_secret, total_value, cur_height, payment_id, &keysend_preimage, session_priv); + match path_res { + Ok(_) => {}, + Err(APIError::MonitorUpdateInProgress) => { + // While a MonitorUpdateInProgress is an Err(_), the payment is still + // considered "in flight" and we shouldn't remove it from the + // PendingOutboundPayment set. + }, + Err(_) => { + let mut pending_outbounds = self.pending_outbound_payments.lock().unwrap(); + if let Some(payment) = pending_outbounds.get_mut(&payment_id) { + let removed = payment.remove(&session_priv, Some(path)); + debug_assert!(removed, "This can't happen as the payment has an entry for this path added by callers"); + } else { + debug_assert!(false, "This can't happen as the payment was added by callers"); + path_res = Err(APIError::APIMisuseError { err: "Internal error: payment disappeared during processing. Please report this bug!".to_owned() }); + } + } + } + results.push(path_res); } let mut has_ok = false; let mut has_err = false; @@ -2667,12 +2753,13 @@ impl ChannelMana } else { None }, }) } else if has_err { - // If we failed to send any paths, we shouldn't have inserted the new PaymentId into - // our `pending_outbound_payments` map at all. - debug_assert!(self.pending_outbound_payments.lock().unwrap().get(&payment_id).is_none()); + // If we failed to send any paths, we should remove the new PaymentId from the + // `pending_outbound_payments` map, as the user isn't expected to `abandon_payment`. + let removed = self.pending_outbound_payments.lock().unwrap().remove(&payment_id).is_some(); + debug_assert!(removed, "We should always have a pending payment to remove here"); Err(PaymentSendFailure::AllFailedRetrySafe(results.drain(..).map(|r| r.unwrap_err()).collect())) } else { - Ok(payment_id) + Ok(()) } } @@ -2696,44 +2783,55 @@ impl ChannelMana } } + let mut onion_session_privs = Vec::with_capacity(route.paths.len()); + for _ in 0..route.paths.len() { + onion_session_privs.push(self.keys_manager.get_secure_random_bytes()); + } + let (total_msat, payment_hash, payment_secret) = { - let outbounds = self.pending_outbound_payments.lock().unwrap(); - if let Some(payment) = outbounds.get(&payment_id) { - match payment { - PendingOutboundPayment::Retryable { - total_msat, payment_hash, payment_secret, pending_amt_msat, .. - } => { - let retry_amt_msat: u64 = route.paths.iter().map(|path| path.last().unwrap().fee_msat).sum(); - if retry_amt_msat + *pending_amt_msat > *total_msat * (100 + RETRY_OVERFLOW_PERCENTAGE) / 100 { + let mut outbounds = self.pending_outbound_payments.lock().unwrap(); + match outbounds.get_mut(&payment_id) { + Some(payment) => { + let res = match payment { + PendingOutboundPayment::Retryable { + total_msat, payment_hash, payment_secret, pending_amt_msat, .. + } => { + let retry_amt_msat: u64 = route.paths.iter().map(|path| path.last().unwrap().fee_msat).sum(); + if retry_amt_msat + *pending_amt_msat > *total_msat * (100 + RETRY_OVERFLOW_PERCENTAGE) / 100 { + return Err(PaymentSendFailure::ParameterError(APIError::APIMisuseError { + err: format!("retry_amt_msat of {} will put pending_amt_msat (currently: {}) more than 10% over total_payment_amt_msat of {}", retry_amt_msat, pending_amt_msat, total_msat).to_string() + })) + } + (*total_msat, *payment_hash, *payment_secret) + }, + PendingOutboundPayment::Legacy { .. } => { return Err(PaymentSendFailure::ParameterError(APIError::APIMisuseError { - err: format!("retry_amt_msat of {} will put pending_amt_msat (currently: {}) more than 10% over total_payment_amt_msat of {}", retry_amt_msat, pending_amt_msat, total_msat).to_string() + err: "Unable to retry payments that were initially sent on LDK versions prior to 0.0.102".to_string() })) - } - (*total_msat, *payment_hash, *payment_secret) - }, - PendingOutboundPayment::Legacy { .. } => { - return Err(PaymentSendFailure::ParameterError(APIError::APIMisuseError { - err: "Unable to retry payments that were initially sent on LDK versions prior to 0.0.102".to_string() - })) - }, - PendingOutboundPayment::Fulfilled { .. } => { - return Err(PaymentSendFailure::ParameterError(APIError::APIMisuseError { - err: "Payment already completed".to_owned() - })); - }, - PendingOutboundPayment::Abandoned { .. } => { - return Err(PaymentSendFailure::ParameterError(APIError::APIMisuseError { - err: "Payment already abandoned (with some HTLCs still pending)".to_owned() - })); - }, - } - } else { - return Err(PaymentSendFailure::ParameterError(APIError::APIMisuseError { - err: format!("Payment with ID {} not found", log_bytes!(payment_id.0)), - })) + }, + PendingOutboundPayment::Fulfilled { .. } => { + return Err(PaymentSendFailure::ParameterError(APIError::APIMisuseError { + err: "Payment already completed".to_owned() + })); + }, + PendingOutboundPayment::Abandoned { .. } => { + return Err(PaymentSendFailure::ParameterError(APIError::APIMisuseError { + err: "Payment already abandoned (with some HTLCs still pending)".to_owned() + })); + }, + }; + for (path, session_priv_bytes) in route.paths.iter().zip(onion_session_privs.iter()) { + assert!(payment.insert(*session_priv_bytes, path)); + } + res + }, + None => + return Err(PaymentSendFailure::ParameterError(APIError::APIMisuseError { + err: format!("Payment with ID {} not found", log_bytes!(payment_id.0)), + })), } }; - return self.send_payment_internal(route, payment_hash, &payment_secret, None, Some(payment_id), Some(total_msat)).map(|_| ()) + self.send_payment_internal(route, payment_hash, &payment_secret, None, payment_id, Some(total_msat), onion_session_privs) } /// Signals that no further retries for the given payment will occur. @@ -2773,7 +2871,8 @@ impl ChannelMana /// would be able to guess -- otherwise, an intermediate node may claim the payment and it will /// never reach the recipient. /// - /// See [`send_payment`] documentation for more details on the return value of this function. + /// See [`send_payment`] documentation for more details on the return value of this function + /// and idempotency guarantees provided by the [`PaymentId`] key. /// /// Similar to regular payments, you MUST NOT reuse a `payment_preimage` value. See /// [`send_payment`] for more information about the risks of duplicate preimage usage. @@ -2781,14 +2880,16 @@ impl ChannelMana /// Note that `route` must have exactly one path. /// /// [`send_payment`]: Self::send_payment - pub fn send_spontaneous_payment(&self, route: &Route, payment_preimage: Option) -> Result<(PaymentHash, PaymentId), PaymentSendFailure> { + pub fn send_spontaneous_payment(&self, route: &Route, payment_preimage: Option, payment_id: PaymentId) -> Result { let preimage = match payment_preimage { Some(p) => p, None => PaymentPreimage(self.keys_manager.get_secure_random_bytes()), }; let payment_hash = PaymentHash(Sha256::hash(&preimage.0).into_inner()); - match self.send_payment_internal(route, payment_hash, &None, Some(preimage), None, None) { - Ok(payment_id) => Ok((payment_hash, payment_id)), + let onion_session_privs = self.add_new_pending_payment(payment_hash, None, payment_id, &route)?; + + match self.send_payment_internal(route, payment_hash, &None, Some(preimage), payment_id, None, onion_session_privs) { + Ok(()) => Ok(payment_hash), Err(e) => Err(e) } } @@ -2808,9 +2909,10 @@ impl ChannelMana } let route = Route { paths: vec![hops], payment_params: None }; + let onion_session_privs = self.add_new_pending_payment(payment_hash, None, payment_id, &route)?; - match self.send_payment_internal(&route, payment_hash, &None, None, Some(payment_id), None) { - Ok(payment_id) => Ok((payment_hash, payment_id)), + match self.send_payment_internal(&route, payment_hash, &None, None, payment_id, None, onion_session_privs) { + Ok(()) => Ok((payment_hash, payment_id)), Err(e) => Err(e) } } @@ -2832,7 +2934,7 @@ impl ChannelMana /// Handles the generation of a funding transaction, optionally (for tests) with a function /// which checks the correctness of the funding transaction given the associated channel. - fn funding_transaction_generated_intern, &Transaction) -> Result>( + fn funding_transaction_generated_intern::Signer>, &Transaction) -> Result>( &self, temporary_channel_id: &[u8; 32], _counterparty_node_id: &PublicKey, funding_transaction: Transaction, find_funding_output: FundingOutput ) -> Result<(), APIError> { let (chan, msg) = { @@ -3046,12 +3148,9 @@ impl ChannelMana mem::swap(&mut forward_htlcs, &mut self.forward_htlcs.lock().unwrap()); for (short_chan_id, mut pending_forwards) in forward_htlcs { - let mut channel_state_lock = self.channel_state.lock().unwrap(); - let channel_state = &mut *channel_state_lock; if short_chan_id != 0 { - let forward_chan_id = match channel_state.short_to_chan_info.get(&short_chan_id) { - Some((_cp_id, chan_id)) => chan_id.clone(), - None => { + macro_rules! forwarding_channel_not_found { + () => { for forward_info in pending_forwards.drain(..) { match forward_info { HTLCForwardInfo::AddHTLC { prev_short_channel_id, prev_htlc_id, forward_info: PendingHTLCInfo { @@ -3138,136 +3237,148 @@ impl ChannelMana } } } + } + } + let forward_chan_id = match self.short_to_chan_info.read().unwrap().get(&short_chan_id) { + Some((_cp_id, chan_id)) => chan_id.clone(), + None => { + forwarding_channel_not_found!(); continue; } }; - if let hash_map::Entry::Occupied(mut chan) = channel_state.by_id.entry(forward_chan_id) { - let mut add_htlc_msgs = Vec::new(); - let mut fail_htlc_msgs = Vec::new(); - for forward_info in pending_forwards.drain(..) { - match forward_info { - HTLCForwardInfo::AddHTLC { prev_short_channel_id, prev_htlc_id, forward_info: PendingHTLCInfo { - routing: PendingHTLCRouting::Forward { - onion_packet, .. - }, incoming_shared_secret, payment_hash, amt_to_forward, outgoing_cltv_value }, - prev_funding_outpoint } => { - log_trace!(self.logger, "Adding HTLC from short id {} with payment_hash {} to channel with short id {} after delay", prev_short_channel_id, log_bytes!(payment_hash.0), short_chan_id); - let htlc_source = HTLCSource::PreviousHopData(HTLCPreviousHopData { - short_channel_id: prev_short_channel_id, - outpoint: prev_funding_outpoint, - htlc_id: prev_htlc_id, - incoming_packet_shared_secret: incoming_shared_secret, - // Phantom payments are only PendingHTLCRouting::Receive. - phantom_shared_secret: None, - }); - match chan.get_mut().send_htlc(amt_to_forward, payment_hash, outgoing_cltv_value, htlc_source.clone(), onion_packet, &self.logger) { - Err(e) => { - if let ChannelError::Ignore(msg) = e { - log_trace!(self.logger, "Failed to forward HTLC with payment_hash {}: {}", log_bytes!(payment_hash.0), msg); - } else { - panic!("Stated return value requirements in send_htlc() were not met"); - } - let (failure_code, data) = self.get_htlc_temp_fail_err_and_data(0x1000|7, short_chan_id, chan.get()); - failed_forwards.push((htlc_source, payment_hash, - HTLCFailReason::Reason { failure_code, data }, - HTLCDestination::NextHopChannel { node_id: Some(chan.get().get_counterparty_node_id()), channel_id: forward_chan_id } - )); - continue; - }, - Ok(update_add) => { - match update_add { - Some(msg) => { add_htlc_msgs.push(msg); }, - None => { - // Nothing to do here...we're waiting on a remote - // revoke_and_ack before we can add anymore HTLCs. The Channel - // will automatically handle building the update_add_htlc and - // commitment_signed messages when we can. - // TODO: Do some kind of timer to set the channel as !is_live() - // as we don't really want others relying on us relaying through - // this channel currently :/. + let mut channel_state_lock = self.channel_state.lock().unwrap(); + let channel_state = &mut *channel_state_lock; + match channel_state.by_id.entry(forward_chan_id) { + hash_map::Entry::Vacant(_) => { + forwarding_channel_not_found!(); + continue; + }, + hash_map::Entry::Occupied(mut chan) => { + let mut add_htlc_msgs = Vec::new(); + let mut fail_htlc_msgs = Vec::new(); + for forward_info in pending_forwards.drain(..) { + match forward_info { + HTLCForwardInfo::AddHTLC { prev_short_channel_id, prev_htlc_id, forward_info: PendingHTLCInfo { + routing: PendingHTLCRouting::Forward { + onion_packet, .. + }, incoming_shared_secret, payment_hash, amt_to_forward, outgoing_cltv_value }, + prev_funding_outpoint } => { + log_trace!(self.logger, "Adding HTLC from short id {} with payment_hash {} to channel with short id {} after delay", prev_short_channel_id, log_bytes!(payment_hash.0), short_chan_id); + let htlc_source = HTLCSource::PreviousHopData(HTLCPreviousHopData { + short_channel_id: prev_short_channel_id, + outpoint: prev_funding_outpoint, + htlc_id: prev_htlc_id, + incoming_packet_shared_secret: incoming_shared_secret, + // Phantom payments are only PendingHTLCRouting::Receive. + phantom_shared_secret: None, + }); + match chan.get_mut().send_htlc(amt_to_forward, payment_hash, outgoing_cltv_value, htlc_source.clone(), onion_packet, &self.logger) { + Err(e) => { + if let ChannelError::Ignore(msg) = e { + log_trace!(self.logger, "Failed to forward HTLC with payment_hash {}: {}", log_bytes!(payment_hash.0), msg); + } else { + panic!("Stated return value requirements in send_htlc() were not met"); + } + let (failure_code, data) = self.get_htlc_temp_fail_err_and_data(0x1000|7, short_chan_id, chan.get()); + failed_forwards.push((htlc_source, payment_hash, + HTLCFailReason::Reason { failure_code, data }, + HTLCDestination::NextHopChannel { node_id: Some(chan.get().get_counterparty_node_id()), channel_id: forward_chan_id } + )); + continue; + }, + Ok(update_add) => { + match update_add { + Some(msg) => { add_htlc_msgs.push(msg); }, + None => { + // Nothing to do here...we're waiting on a remote + // revoke_and_ack before we can add anymore HTLCs. The Channel + // will automatically handle building the update_add_htlc and + // commitment_signed messages when we can. + // TODO: Do some kind of timer to set the channel as !is_live() + // as we don't really want others relying on us relaying through + // this channel currently :/. + } } } } - } - }, - HTLCForwardInfo::AddHTLC { .. } => { - panic!("short_channel_id != 0 should imply any pending_forward entries are of type Forward"); - }, - HTLCForwardInfo::FailHTLC { htlc_id, err_packet } => { - log_trace!(self.logger, "Failing HTLC back to channel with short id {} (backward HTLC ID {}) after delay", short_chan_id, htlc_id); - match chan.get_mut().get_update_fail_htlc(htlc_id, err_packet, &self.logger) { - Err(e) => { - if let ChannelError::Ignore(msg) = e { - log_trace!(self.logger, "Failed to fail HTLC with ID {} backwards to short_id {}: {}", htlc_id, short_chan_id, msg); - } else { - panic!("Stated return value requirements in get_update_fail_htlc() were not met"); + }, + HTLCForwardInfo::AddHTLC { .. } => { + panic!("short_channel_id != 0 should imply any pending_forward entries are of type Forward"); + }, + HTLCForwardInfo::FailHTLC { htlc_id, err_packet } => { + log_trace!(self.logger, "Failing HTLC back to channel with short id {} (backward HTLC ID {}) after delay", short_chan_id, htlc_id); + match chan.get_mut().get_update_fail_htlc(htlc_id, err_packet, &self.logger) { + Err(e) => { + if let ChannelError::Ignore(msg) = e { + log_trace!(self.logger, "Failed to fail HTLC with ID {} backwards to short_id {}: {}", htlc_id, short_chan_id, msg); + } else { + panic!("Stated return value requirements in get_update_fail_htlc() were not met"); + } + // fail-backs are best-effort, we probably already have one + // pending, and if not that's OK, if not, the channel is on + // the chain and sending the HTLC-Timeout is their problem. + continue; + }, + Ok(Some(msg)) => { fail_htlc_msgs.push(msg); }, + Ok(None) => { + // Nothing to do here...we're waiting on a remote + // revoke_and_ack before we can update the commitment + // transaction. The Channel will automatically handle + // building the update_fail_htlc and commitment_signed + // messages when we can. + // We don't need any kind of timer here as they should fail + // the channel onto the chain if they can't get our + // update_fail_htlc in time, it's not our problem. } - // fail-backs are best-effort, we probably already have one - // pending, and if not that's OK, if not, the channel is on - // the chain and sending the HTLC-Timeout is their problem. - continue; - }, - Ok(Some(msg)) => { fail_htlc_msgs.push(msg); }, - Ok(None) => { - // Nothing to do here...we're waiting on a remote - // revoke_and_ack before we can update the commitment - // transaction. The Channel will automatically handle - // building the update_fail_htlc and commitment_signed - // messages when we can. - // We don't need any kind of timer here as they should fail - // the channel onto the chain if they can't get our - // update_fail_htlc in time, it's not our problem. } - } - }, + }, + } } - } - if !add_htlc_msgs.is_empty() || !fail_htlc_msgs.is_empty() { - let (commitment_msg, monitor_update) = match chan.get_mut().send_commitment(&self.logger) { - Ok(res) => res, - Err(e) => { - // We surely failed send_commitment due to bad keys, in that case - // close channel and then send error message to peer. - let counterparty_node_id = chan.get().get_counterparty_node_id(); - let err: Result<(), _> = match e { - ChannelError::Ignore(_) | ChannelError::Warn(_) => { - panic!("Stated return value requirements in send_commitment() were not met"); - } - ChannelError::Close(msg) => { - log_trace!(self.logger, "Closing channel {} due to Close-required error: {}", log_bytes!(chan.key()[..]), msg); - let mut channel = remove_channel!(self, channel_state, chan); - // ChannelClosed event is generated by handle_error for us. - Err(MsgHandleErrInternal::from_finish_shutdown(msg, channel.channel_id(), channel.get_user_id(), channel.force_shutdown(true), self.get_channel_update_for_broadcast(&channel).ok())) - }, - }; - handle_errors.push((counterparty_node_id, err)); - continue; - } - }; - match self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), monitor_update) { - ChannelMonitorUpdateStatus::Completed => {}, - e => { - handle_errors.push((chan.get().get_counterparty_node_id(), handle_monitor_update_res!(self, e, channel_state, chan, RAACommitmentOrder::CommitmentFirst, false, true))); - continue; + if !add_htlc_msgs.is_empty() || !fail_htlc_msgs.is_empty() { + let (commitment_msg, monitor_update) = match chan.get_mut().send_commitment(&self.logger) { + Ok(res) => res, + Err(e) => { + // We surely failed send_commitment due to bad keys, in that case + // close channel and then send error message to peer. + let counterparty_node_id = chan.get().get_counterparty_node_id(); + let err: Result<(), _> = match e { + ChannelError::Ignore(_) | ChannelError::Warn(_) => { + panic!("Stated return value requirements in send_commitment() were not met"); + } + ChannelError::Close(msg) => { + log_trace!(self.logger, "Closing channel {} due to Close-required error: {}", log_bytes!(chan.key()[..]), msg); + let mut channel = remove_channel!(self, chan); + // ChannelClosed event is generated by handle_error for us. + Err(MsgHandleErrInternal::from_finish_shutdown(msg, channel.channel_id(), channel.get_user_id(), channel.force_shutdown(true), self.get_channel_update_for_broadcast(&channel).ok())) + }, + }; + handle_errors.push((counterparty_node_id, err)); + continue; + } + }; + match self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), monitor_update) { + ChannelMonitorUpdateStatus::Completed => {}, + e => { + handle_errors.push((chan.get().get_counterparty_node_id(), handle_monitor_update_res!(self, e, chan, RAACommitmentOrder::CommitmentFirst, false, true))); + continue; + } } + log_debug!(self.logger, "Forwarding HTLCs resulted in a commitment update with {} HTLCs added and {} HTLCs failed for channel {}", + add_htlc_msgs.len(), fail_htlc_msgs.len(), log_bytes!(chan.get().channel_id())); + channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { + node_id: chan.get().get_counterparty_node_id(), + updates: msgs::CommitmentUpdate { + update_add_htlcs: add_htlc_msgs, + update_fulfill_htlcs: Vec::new(), + update_fail_htlcs: fail_htlc_msgs, + update_fail_malformed_htlcs: Vec::new(), + update_fee: None, + commitment_signed: commitment_msg, + }, + }); } - log_debug!(self.logger, "Forwarding HTLCs resulted in a commitment update with {} HTLCs added and {} HTLCs failed for channel {}", - add_htlc_msgs.len(), fail_htlc_msgs.len(), log_bytes!(chan.get().channel_id())); - channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { - node_id: chan.get().get_counterparty_node_id(), - updates: msgs::CommitmentUpdate { - update_add_htlcs: add_htlc_msgs, - update_fulfill_htlcs: Vec::new(), - update_fail_htlcs: fail_htlc_msgs, - update_fail_malformed_htlcs: Vec::new(), - update_fee: None, - commitment_signed: commitment_msg, - }, - }); } - } else { - unreachable!(); } } else { for forward_info in pending_forwards.drain(..) { @@ -3329,7 +3440,8 @@ impl ChannelMana payment_secret: $payment_data.payment_secret, } }; - let (_, htlcs) = channel_state.claimable_htlcs.entry(payment_hash) + let mut claimable_htlcs = self.claimable_htlcs.lock().unwrap(); + let (_, htlcs) = claimable_htlcs.entry(payment_hash) .or_insert_with(|| (purpose(), Vec::new())); if htlcs.len() == 1 { if let OnionPayload::Spontaneous(_) = htlcs[0].onion_payload { @@ -3397,7 +3509,7 @@ impl ChannelMana check_total_value!(payment_data, payment_preimage); }, OnionPayload::Spontaneous(preimage) => { - match channel_state.claimable_htlcs.entry(payment_hash) { + match self.claimable_htlcs.lock().unwrap().entry(payment_hash) { hash_map::Entry::Vacant(e) => { let purpose = events::PaymentPurpose::SpontaneousPayment(preimage); e.insert((purpose.clone(), vec![claimable_htlc])); @@ -3492,7 +3604,7 @@ impl ChannelMana self.process_background_events(); } - fn update_channel_fee(&self, short_to_chan_info: &mut HashMap, pending_msg_events: &mut Vec, chan_id: &[u8; 32], chan: &mut Channel, new_feerate: u32) -> (bool, NotifyOption, Result<(), MsgHandleErrInternal>) { + fn update_channel_fee(&self, pending_msg_events: &mut Vec, chan_id: &[u8; 32], chan: &mut Channel<::Signer>, new_feerate: u32) -> (bool, NotifyOption, Result<(), MsgHandleErrInternal>) { if !chan.is_outbound() { return (true, NotifyOption::SkipPersist, Ok(())); } // If the feerate has decreased by less than half, don't bother if new_feerate <= chan.get_feerate() && new_feerate * 2 > chan.get_feerate() { @@ -3512,7 +3624,7 @@ impl ChannelMana let res = match chan.send_update_fee_and_commit(new_feerate, &self.logger) { Ok(res) => Ok(res), Err(e) => { - let (drop, res) = convert_chan_err!(self, e, short_to_chan_info, chan, chan_id); + let (drop, res) = convert_chan_err!(self, e, chan, chan_id); if drop { retain_channel = false; } Err(res) } @@ -3535,7 +3647,7 @@ impl ChannelMana Ok(()) }, e => { - let (res, drop) = handle_monitor_update_res!(self, e, short_to_chan_info, chan, RAACommitmentOrder::CommitmentFirst, chan_id, COMMITMENT_UPDATE_ONLY); + let (res, drop) = handle_monitor_update_res!(self, e, chan, RAACommitmentOrder::CommitmentFirst, chan_id, COMMITMENT_UPDATE_ONLY); if drop { retain_channel = false; } res } @@ -3563,9 +3675,8 @@ impl ChannelMana let mut channel_state_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_state_lock; let pending_msg_events = &mut channel_state.pending_msg_events; - let short_to_chan_info = &mut channel_state.short_to_chan_info; channel_state.by_id.retain(|chan_id, chan| { - let (retain_channel, chan_needs_persist, err) = self.update_channel_fee(short_to_chan_info, pending_msg_events, chan_id, chan, new_feerate); + let (retain_channel, chan_needs_persist, err) = self.update_channel_fee(pending_msg_events, chan_id, chan, new_feerate); if chan_needs_persist == NotifyOption::DoPersist { should_persist = NotifyOption::DoPersist; } if err.is_err() { handle_errors.push(err); @@ -3578,6 +3689,45 @@ impl ChannelMana }); } + fn remove_stale_resolved_payments(&self) { + // If an outbound payment was completed, and no pending HTLCs remain, we should remove it + // from the map. However, if we did that immediately when the last payment HTLC is claimed, + // this could race the user making a duplicate send_payment call and our idempotency + // guarantees would be violated. Instead, we wait a few timer ticks to do the actual + // removal. This should be more than sufficient to ensure the idempotency of any + // `send_payment` calls that were made at the same time the `PaymentSent` event was being + // processed. + let mut pending_outbound_payments = self.pending_outbound_payments.lock().unwrap(); + let pending_events = self.pending_events.lock().unwrap(); + pending_outbound_payments.retain(|payment_id, payment| { + if let PendingOutboundPayment::Fulfilled { session_privs, timer_ticks_without_htlcs, .. } = payment { + let mut no_remaining_entries = session_privs.is_empty(); + if no_remaining_entries { + for ev in pending_events.iter() { + match ev { + events::Event::PaymentSent { payment_id: Some(ev_payment_id), .. } | + events::Event::PaymentPathSuccessful { payment_id: ev_payment_id, .. } | + events::Event::PaymentPathFailed { payment_id: Some(ev_payment_id), .. } => { + if payment_id == ev_payment_id { + no_remaining_entries = false; + break; + } + }, + _ => {}, + } + } + } + if no_remaining_entries { + *timer_ticks_without_htlcs += 1; + *timer_ticks_without_htlcs <= IDEMPOTENCY_TIMEOUT_TICKS + } else { + *timer_ticks_without_htlcs = 0; + true + } + } else { true } + }); + } + /// Performs actions which should happen on startup and roughly once per minute thereafter. /// /// This currently includes: @@ -3603,10 +3753,9 @@ impl ChannelMana let mut channel_state_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_state_lock; let pending_msg_events = &mut channel_state.pending_msg_events; - let short_to_chan_info = &mut channel_state.short_to_chan_info; channel_state.by_id.retain(|chan_id, chan| { let counterparty_node_id = chan.get_counterparty_node_id(); - let (retain_channel, chan_needs_persist, err) = self.update_channel_fee(short_to_chan_info, pending_msg_events, chan_id, chan, new_feerate); + let (retain_channel, chan_needs_persist, err) = self.update_channel_fee(pending_msg_events, chan_id, chan, new_feerate); if chan_needs_persist == NotifyOption::DoPersist { should_persist = NotifyOption::DoPersist; } if err.is_err() { handle_errors.push((err, counterparty_node_id)); @@ -3614,7 +3763,7 @@ impl ChannelMana if !retain_channel { return false; } if let Err(e) = chan.timer_check_closing_negotiation_progress() { - let (needs_close, err) = convert_chan_err!(self, e, short_to_chan_info, chan, chan_id); + let (needs_close, err) = convert_chan_err!(self, e, chan, chan_id); handle_errors.push((Err(err), chan.get_counterparty_node_id())); if needs_close { return false; } } @@ -3649,29 +3798,29 @@ impl ChannelMana true }); + } - channel_state.claimable_htlcs.retain(|payment_hash, (_, htlcs)| { - if htlcs.is_empty() { - // This should be unreachable - debug_assert!(false); + self.claimable_htlcs.lock().unwrap().retain(|payment_hash, (_, htlcs)| { + if htlcs.is_empty() { + // This should be unreachable + debug_assert!(false); + return false; + } + if let OnionPayload::Invoice { .. } = htlcs[0].onion_payload { + // Check if we've received all the parts we need for an MPP (the value of the parts adds to total_msat). + // In this case we're not going to handle any timeouts of the parts here. + if htlcs[0].total_msat == htlcs.iter().fold(0, |total, htlc| total + htlc.value) { + return true; + } else if htlcs.into_iter().any(|htlc| { + htlc.timer_ticks += 1; + return htlc.timer_ticks >= MPP_TIMEOUT_TICKS + }) { + timed_out_mpp_htlcs.extend(htlcs.into_iter().map(|htlc| (htlc.prev_hop.clone(), payment_hash.clone()))); return false; } - if let OnionPayload::Invoice { .. } = htlcs[0].onion_payload { - // Check if we've received all the parts we need for an MPP (the value of the parts adds to total_msat). - // In this case we're not going to handle any timeouts of the parts here. - if htlcs[0].total_msat == htlcs.iter().fold(0, |total, htlc| total + htlc.value) { - return true; - } else if htlcs.into_iter().any(|htlc| { - htlc.timer_ticks += 1; - return htlc.timer_ticks >= MPP_TIMEOUT_TICKS - }) { - timed_out_mpp_htlcs.extend(htlcs.into_iter().map(|htlc| (htlc.prev_hop.clone(), payment_hash.clone()))); - return false; - } - } - true - }); - } + } + true + }); for htlc_source in timed_out_mpp_htlcs.drain(..) { let receiver = HTLCDestination::FailedPayment { payment_hash: htlc_source.1 }; @@ -3681,6 +3830,9 @@ impl ChannelMana for (err, counterparty_node_id) in handle_errors.drain(..) { let _ = handle_error!(self, err, counterparty_node_id); } + + self.remove_stale_resolved_payments(); + should_persist }); } @@ -3701,10 +3853,7 @@ impl ChannelMana pub fn fail_htlc_backwards(&self, payment_hash: &PaymentHash) { let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); - let removed_source = { - let mut channel_state = self.channel_state.lock().unwrap(); - channel_state.claimable_htlcs.remove(payment_hash) - }; + let removed_source = self.claimable_htlcs.lock().unwrap().remove(payment_hash); if let Some((_, mut sources)) = removed_source { for htlc in sources.drain(..) { let mut htlc_msat_height_data = byte_utils::be64_to_array(htlc.value).to_vec(); @@ -3723,7 +3872,7 @@ impl ChannelMana /// /// This is for failures on the channel on which the HTLC was *received*, not failures /// forwarding - fn get_htlc_inbound_temp_fail_err_and_data(&self, desired_err_code: u16, chan: &Channel) -> (u16, Vec) { + fn get_htlc_inbound_temp_fail_err_and_data(&self, desired_err_code: u16, chan: &Channel<::Signer>) -> (u16, Vec) { // We can't be sure what SCID was used when relaying inbound towards us, so we have to // guess somewhat. If its a public channel, we figure best to just use the real SCID (as // we're not leaking that we have a channel with the counterparty), otherwise we try to use @@ -3743,7 +3892,7 @@ impl ChannelMana /// Gets an HTLC onion failure code and error data for an `UPDATE` error, given the error code /// that we want to return and a channel. - fn get_htlc_temp_fail_err_and_data(&self, desired_err_code: u16, scid: u64, chan: &Channel) -> (u16, Vec) { + fn get_htlc_temp_fail_err_and_data(&self, desired_err_code: u16, scid: u64, chan: &Channel<::Signer>) -> (u16, Vec) { debug_assert_eq!(desired_err_code & 0x1000, 0x1000); if let Ok(upd) = self.get_channel_update_for_onion(scid, chan) { let mut enc = VecWriter(Vec::with_capacity(upd.serialized_length() + 6)); @@ -4006,7 +4155,7 @@ impl ChannelMana let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); - let removed_source = self.channel_state.lock().unwrap().claimable_htlcs.remove(&payment_hash); + let removed_source = self.claimable_htlcs.lock().unwrap().remove(&payment_hash); if let Some((payment_purpose, mut sources)) = removed_source { assert!(!sources.is_empty()); @@ -4029,10 +4178,19 @@ impl ChannelMana let mut channel_state_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_state_lock; for htlc in sources.iter() { - if let None = channel_state.short_to_chan_info.get(&htlc.prev_hop.short_channel_id) { + let chan_id = match self.short_to_chan_info.read().unwrap().get(&htlc.prev_hop.short_channel_id) { + Some((_cp_id, chan_id)) => chan_id.clone(), + None => { + valid_mpp = false; + break; + } + }; + + if let None = channel_state.by_id.get(&chan_id) { valid_mpp = false; break; } + if expected_amt_msat.is_some() && expected_amt_msat != Some(htlc.total_msat) { log_error!(self.logger, "Somehow ended up with an MPP payment with different total amounts - this should not be reachable!"); debug_assert!(false); @@ -4113,10 +4271,10 @@ impl ChannelMana } } - fn claim_funds_from_hop(&self, channel_state_lock: &mut MutexGuard>, prev_hop: HTLCPreviousHopData, payment_preimage: PaymentPreimage) -> ClaimFundsFromHop { + fn claim_funds_from_hop(&self, channel_state_lock: &mut MutexGuard::Signer>>, prev_hop: HTLCPreviousHopData, payment_preimage: PaymentPreimage) -> ClaimFundsFromHop { //TODO: Delay the claimed_funds relaying just like we do outbound relay! let channel_state = &mut **channel_state_lock; - let chan_id = match channel_state.short_to_chan_info.get(&prev_hop.short_channel_id) { + let chan_id = match self.short_to_chan_info.read().unwrap().get(&prev_hop.short_channel_id) { Some((_cp_id, chan_id)) => chan_id.clone(), None => { return ClaimFundsFromHop::PrevHopForceClosed @@ -4135,7 +4293,7 @@ impl ChannelMana payment_preimage, e); return ClaimFundsFromHop::MonitorUpdateFail( chan.get().get_counterparty_node_id(), - handle_monitor_update_res!(self, e, channel_state, chan, RAACommitmentOrder::CommitmentFirst, false, msgs.is_some()).unwrap_err(), + handle_monitor_update_res!(self, e, chan, RAACommitmentOrder::CommitmentFirst, false, msgs.is_some()).unwrap_err(), Some(htlc_value_msat) ); } @@ -4170,14 +4328,14 @@ impl ChannelMana }, } let counterparty_node_id = chan.get().get_counterparty_node_id(); - let (drop, res) = convert_chan_err!(self, e, channel_state.short_to_chan_info, chan.get_mut(), &chan_id); + let (drop, res) = convert_chan_err!(self, e, chan.get_mut(), &chan_id); if drop { chan.remove_entry(); } return ClaimFundsFromHop::MonitorUpdateFail(counterparty_node_id, res, None); }, } - } else { unreachable!(); } + } else { return ClaimFundsFromHop::PrevHopForceClosed } } fn finalize_claims(&self, mut sources: Vec) { @@ -4198,15 +4356,12 @@ impl ChannelMana } ); } - if payment.get().remaining_parts() == 0 { - payment.remove(); - } } } } } - fn claim_funds_internal(&self, mut channel_state_lock: MutexGuard>, source: HTLCSource, payment_preimage: PaymentPreimage, forwarded_htlc_value_msat: Option, from_onchain: bool, next_channel_id: [u8; 32]) { + fn claim_funds_internal(&self, mut channel_state_lock: MutexGuard::Signer>>, source: HTLCSource, payment_preimage: PaymentPreimage, forwarded_htlc_value_msat: Option, from_onchain: bool, next_channel_id: [u8; 32]) { match source { HTLCSource::OutboundRoute { session_priv, payment_id, path, .. } => { mem::drop(channel_state_lock); @@ -4246,10 +4401,6 @@ impl ChannelMana } ); } - - if payment.get().remaining_parts() == 0 { - payment.remove(); - } } } else { log_trace!(self.logger, "Received duplicative fulfill for HTLC with payment_preimage {}", log_bytes!(payment_preimage.0)); @@ -4432,7 +4583,7 @@ impl ChannelMana } }; channel_state.pending_msg_events.push(send_msg_err_event); - let _ = remove_channel!(self, channel_state, channel); + let _ = remove_channel!(self, channel); return Err(APIError::APIMisuseError { err: "Please use accept_inbound_channel_from_trusted_peer_0conf to accept channels with zero confirmations.".to_owned() }); } @@ -4512,7 +4663,7 @@ impl ChannelMana if chan.get().get_counterparty_node_id() != *counterparty_node_id { return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.temporary_channel_id)); } - try_chan_entry!(self, chan.get_mut().accept_channel(&msg, &self.default_configuration.channel_handshake_limits, &their_features), channel_state, chan); + try_chan_entry!(self, chan.get_mut().accept_channel(&msg, &self.default_configuration.channel_handshake_limits, &their_features), chan); (chan.get().get_value_satoshis(), chan.get().get_funding_redeemscript().to_v0_p2wsh(), chan.get().get_user_id()) }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.temporary_channel_id)) @@ -4539,7 +4690,7 @@ impl ChannelMana if chan.get().get_counterparty_node_id() != *counterparty_node_id { return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.temporary_channel_id)); } - (try_chan_entry!(self, chan.get_mut().funding_created(msg, best_block, &self.logger), channel_state, chan), chan.remove()) + (try_chan_entry!(self, chan.get_mut().funding_created(msg, best_block, &self.logger), chan), chan.remove()) }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.temporary_channel_id)) } @@ -4592,7 +4743,7 @@ impl ChannelMana msg: funding_msg, }); if let Some(msg) = channel_ready { - send_channel_ready!(channel_state.short_to_chan_info, channel_state.pending_msg_events, chan, msg); + send_channel_ready!(self, channel_state.pending_msg_events, chan, msg); } e.insert(chan); } @@ -4612,12 +4763,12 @@ impl ChannelMana } let (monitor, funding_tx, channel_ready) = match chan.get_mut().funding_signed(&msg, best_block, &self.logger) { Ok(update) => update, - Err(e) => try_chan_entry!(self, Err(e), channel_state, chan), + Err(e) => try_chan_entry!(self, Err(e), chan), }; match self.chain_monitor.watch_channel(chan.get().get_funding_txo().unwrap(), monitor) { ChannelMonitorUpdateStatus::Completed => {}, e => { - let mut res = handle_monitor_update_res!(self, e, channel_state, chan, RAACommitmentOrder::RevokeAndACKFirst, channel_ready.is_some(), OPTIONALLY_RESEND_FUNDING_LOCKED); + let mut res = handle_monitor_update_res!(self, e, chan, RAACommitmentOrder::RevokeAndACKFirst, channel_ready.is_some(), OPTIONALLY_RESEND_FUNDING_LOCKED); if let Err(MsgHandleErrInternal { ref mut shutdown_finish, .. }) = res { // We weren't able to watch the channel to begin with, so no updates should be made on // it. Previously, full_stack_target found an (unreachable) panic when the @@ -4630,7 +4781,7 @@ impl ChannelMana }, } if let Some(msg) = channel_ready { - send_channel_ready!(channel_state.short_to_chan_info, channel_state.pending_msg_events, chan.get(), msg); + send_channel_ready!(self, channel_state.pending_msg_events, chan.get(), msg); } funding_tx }, @@ -4651,7 +4802,7 @@ impl ChannelMana return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id)); } let announcement_sigs_opt = try_chan_entry!(self, chan.get_mut().channel_ready(&msg, self.get_our_node_id(), - self.genesis_hash.clone(), &self.best_block.read().unwrap(), &self.logger), channel_state, chan); + self.genesis_hash.clone(), &self.best_block.read().unwrap(), &self.logger), chan); if let Some(announcement_sigs) = announcement_sigs_opt { log_trace!(self.logger, "Sending announcement_signatures for channel {}", log_bytes!(chan.get().channel_id())); channel_state.pending_msg_events.push(events::MessageSendEvent::SendAnnouncementSignatures { @@ -4672,6 +4823,9 @@ impl ChannelMana }); } } + + emit_channel_ready_event!(self, chan.get_mut()); + Ok(()) }, hash_map::Entry::Vacant(_) => Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id)) @@ -4696,16 +4850,16 @@ impl ChannelMana if chan_entry.get().sent_shutdown() { " after we initiated shutdown" } else { "" }); } - let (shutdown, monitor_update, htlcs) = try_chan_entry!(self, chan_entry.get_mut().shutdown(&self.keys_manager, &their_features, &msg), channel_state, chan_entry); + let (shutdown, monitor_update, htlcs) = try_chan_entry!(self, chan_entry.get_mut().shutdown(&self.keys_manager, &their_features, &msg), chan_entry); dropped_htlcs = htlcs; // Update the monitor with the shutdown script if necessary. if let Some(monitor_update) = monitor_update { let update_res = self.chain_monitor.update_channel(chan_entry.get().get_funding_txo().unwrap(), monitor_update); let (result, is_permanent) = - handle_monitor_update_res!(self, update_res, channel_state.short_to_chan_info, chan_entry.get_mut(), RAACommitmentOrder::CommitmentFirst, chan_entry.key(), NO_UPDATE); + handle_monitor_update_res!(self, update_res, chan_entry.get_mut(), RAACommitmentOrder::CommitmentFirst, chan_entry.key(), NO_UPDATE); if is_permanent { - remove_channel!(self, channel_state, chan_entry); + remove_channel!(self, chan_entry); break result; } } @@ -4740,7 +4894,7 @@ impl ChannelMana if chan_entry.get().get_counterparty_node_id() != *counterparty_node_id { return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id)); } - let (closing_signed, tx) = try_chan_entry!(self, chan_entry.get_mut().closing_signed(&self.fee_estimator, &msg), channel_state, chan_entry); + let (closing_signed, tx) = try_chan_entry!(self, chan_entry.get_mut().closing_signed(&self.fee_estimator, &msg), chan_entry); if let Some(msg) = closing_signed { channel_state.pending_msg_events.push(events::MessageSendEvent::SendClosingSigned { node_id: counterparty_node_id.clone(), @@ -4753,7 +4907,7 @@ impl ChannelMana // also implies there are no pending HTLCs left on the channel, so we can // fully delete it from tracking (the channel monitor is still around to // watch for old state broadcasts)! - (tx, Some(remove_channel!(self, channel_state, chan_entry))) + (tx, Some(remove_channel!(self, chan_entry))) } else { (tx, None) } }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id)) @@ -4795,7 +4949,7 @@ impl ChannelMana return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id)); } - let create_pending_htlc_status = |chan: &Channel, pending_forward_info: PendingHTLCStatus, error_code: u16| { + let create_pending_htlc_status = |chan: &Channel<::Signer>, pending_forward_info: PendingHTLCStatus, error_code: u16| { // If the update_add is completely bogus, the call will Err and we will close, // but if we've sent a shutdown and they haven't acknowledged it yet, we just // want to reject the new HTLC and fail it backwards instead of forwarding. @@ -4817,7 +4971,7 @@ impl ChannelMana _ => pending_forward_info } }; - try_chan_entry!(self, chan.get_mut().update_add_htlc(&msg, pending_forward_info, create_pending_htlc_status, &self.logger), channel_state, chan); + try_chan_entry!(self, chan.get_mut().update_add_htlc(&msg, pending_forward_info, create_pending_htlc_status, &self.logger), chan); }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id)) } @@ -4833,7 +4987,7 @@ impl ChannelMana if chan.get().get_counterparty_node_id() != *counterparty_node_id { return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id)); } - try_chan_entry!(self, chan.get_mut().update_fulfill_htlc(&msg), channel_state, chan) + try_chan_entry!(self, chan.get_mut().update_fulfill_htlc(&msg), chan) }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id)) } @@ -4850,7 +5004,7 @@ impl ChannelMana if chan.get().get_counterparty_node_id() != *counterparty_node_id { return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id)); } - try_chan_entry!(self, chan.get_mut().update_fail_htlc(&msg, HTLCFailReason::LightningError { err: msg.reason.clone() }), channel_state, chan); + try_chan_entry!(self, chan.get_mut().update_fail_htlc(&msg, HTLCFailReason::LightningError { err: msg.reason.clone() }), chan); }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id)) } @@ -4867,9 +5021,9 @@ impl ChannelMana } if (msg.failure_code & 0x8000) == 0 { let chan_err: ChannelError = ChannelError::Close("Got update_fail_malformed_htlc with BADONION not set".to_owned()); - try_chan_entry!(self, Err(chan_err), channel_state, chan); + try_chan_entry!(self, Err(chan_err), chan); } - try_chan_entry!(self, chan.get_mut().update_fail_malformed_htlc(&msg, HTLCFailReason::Reason { failure_code: msg.failure_code, data: Vec::new() }), channel_state, chan); + try_chan_entry!(self, chan.get_mut().update_fail_malformed_htlc(&msg, HTLCFailReason::Reason { failure_code: msg.failure_code, data: Vec::new() }), chan); Ok(()) }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id)) @@ -4886,17 +5040,17 @@ impl ChannelMana } let (revoke_and_ack, commitment_signed, monitor_update) = match chan.get_mut().commitment_signed(&msg, &self.logger) { - Err((None, e)) => try_chan_entry!(self, Err(e), channel_state, chan), + Err((None, e)) => try_chan_entry!(self, Err(e), chan), Err((Some(update), e)) => { assert!(chan.get().is_awaiting_monitor_update()); let _ = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), update); - try_chan_entry!(self, Err(e), channel_state, chan); + try_chan_entry!(self, Err(e), chan); unreachable!(); }, Ok(res) => res }; let update_res = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), monitor_update); - if let Err(e) = handle_monitor_update_res!(self, update_res, channel_state, chan, RAACommitmentOrder::RevokeAndACKFirst, true, commitment_signed.is_some()) { + if let Err(e) = handle_monitor_update_res!(self, update_res, chan, RAACommitmentOrder::RevokeAndACKFirst, true, commitment_signed.is_some()) { return Err(e); } @@ -4973,7 +5127,7 @@ impl ChannelMana } let was_paused_for_mon_update = chan.get().is_awaiting_monitor_update(); let raa_updates = break_chan_entry!(self, - chan.get_mut().revoke_and_ack(&msg, &self.logger), channel_state, chan); + chan.get_mut().revoke_and_ack(&msg, &self.logger), chan); htlcs_to_fail = raa_updates.holding_cell_failed_htlcs; let update_res = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), raa_updates.monitor_update); if was_paused_for_mon_update { @@ -4985,7 +5139,7 @@ impl ChannelMana break Err(MsgHandleErrInternal::ignore_no_close("Existing pending monitor update prevented responses to RAA".to_owned())); } if update_res != ChannelMonitorUpdateStatus::Completed { - if let Err(e) = handle_monitor_update_res!(self, update_res, channel_state, chan, + if let Err(e) = handle_monitor_update_res!(self, update_res, chan, RAACommitmentOrder::CommitmentFirst, false, raa_updates.commitment_update.is_some(), false, raa_updates.accepted_htlcs, raa_updates.failed_htlcs, @@ -5033,7 +5187,7 @@ impl ChannelMana if chan.get().get_counterparty_node_id() != *counterparty_node_id { return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id)); } - try_chan_entry!(self, chan.get_mut().update_fee(&self.fee_estimator, &msg), channel_state, chan); + try_chan_entry!(self, chan.get_mut().update_fee(&self.fee_estimator, &msg), chan); }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id)) } @@ -5055,7 +5209,7 @@ impl ChannelMana channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelAnnouncement { msg: try_chan_entry!(self, chan.get_mut().announcement_signatures( - self.get_our_node_id(), self.genesis_hash.clone(), self.best_block.read().unwrap().height(), msg), channel_state, chan), + self.get_our_node_id(), self.genesis_hash.clone(), self.best_block.read().unwrap().height(), msg), chan), // Note that announcement_signatures fails if the channel cannot be announced, // so get_channel_update_for_broadcast will never fail by the time we get here. update_msg: self.get_channel_update_for_broadcast(chan.get()).unwrap(), @@ -5068,15 +5222,15 @@ impl ChannelMana /// Returns ShouldPersist if anything changed, otherwise either SkipPersist or an Err. fn internal_channel_update(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelUpdate) -> Result { - let mut channel_state_lock = self.channel_state.lock().unwrap(); - let channel_state = &mut *channel_state_lock; - let chan_id = match channel_state.short_to_chan_info.get(&msg.contents.short_channel_id) { + let chan_id = match self.short_to_chan_info.read().unwrap().get(&msg.contents.short_channel_id) { Some((_cp_id, chan_id)) => chan_id.clone(), None => { // It's not a local channel return Ok(NotifyOption::SkipPersist) } }; + let mut channel_state_lock = self.channel_state.lock().unwrap(); + let channel_state = &mut *channel_state_lock; match channel_state.by_id.entry(chan_id) { hash_map::Entry::Occupied(mut chan) => { if chan.get().get_counterparty_node_id() != *counterparty_node_id { @@ -5094,10 +5248,10 @@ impl ChannelMana return Ok(NotifyOption::SkipPersist); } else { log_debug!(self.logger, "Received channel_update for channel {}.", log_bytes!(chan_id)); - try_chan_entry!(self, chan.get_mut().channel_update(&msg), channel_state, chan); + try_chan_entry!(self, chan.get_mut().channel_update(&msg), chan); } }, - hash_map::Entry::Vacant(_) => unreachable!() + hash_map::Entry::Vacant(_) => return Ok(NotifyOption::SkipPersist) } Ok(NotifyOption::DoPersist) } @@ -5119,7 +5273,7 @@ impl ChannelMana // add-HTLCs on disconnect, we may be handed HTLCs to fail backwards here. let responses = try_chan_entry!(self, chan.get_mut().channel_reestablish( msg, &self.logger, self.our_network_pubkey.clone(), self.genesis_hash, - &*self.best_block.read().unwrap()), channel_state, chan); + &*self.best_block.read().unwrap()), chan); let mut channel_update = None; if let Some(msg) = responses.shutdown_msg { channel_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown { @@ -5183,7 +5337,7 @@ impl ChannelMana let by_id = &mut channel_state.by_id; let pending_msg_events = &mut channel_state.pending_msg_events; if let hash_map::Entry::Occupied(chan_entry) = by_id.entry(funding_outpoint.to_channel_id()) { - let mut chan = remove_channel!(self, channel_state, chan_entry); + let mut chan = remove_channel!(self, chan_entry); failed_channels.push(chan.force_shutdown(false)); if let Ok(update) = self.get_channel_update_for_broadcast(&chan) { pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { @@ -5242,7 +5396,6 @@ impl ChannelMana let mut channel_state_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_state_lock; let by_id = &mut channel_state.by_id; - let short_to_chan_info = &mut channel_state.short_to_chan_info; let pending_msg_events = &mut channel_state.pending_msg_events; by_id.retain(|channel_id, chan| { @@ -5265,7 +5418,7 @@ impl ChannelMana }, e => { has_monitor_update = true; - let (res, close_channel) = handle_monitor_update_res!(self, e, short_to_chan_info, chan, RAACommitmentOrder::CommitmentFirst, channel_id, COMMITMENT_UPDATE_ONLY); + let (res, close_channel) = handle_monitor_update_res!(self, e, chan, RAACommitmentOrder::CommitmentFirst, channel_id, COMMITMENT_UPDATE_ONLY); handle_errors.push((chan.get_counterparty_node_id(), res)); if close_channel { return false; } }, @@ -5274,7 +5427,7 @@ impl ChannelMana true }, Err(e) => { - let (close_channel, res) = convert_chan_err!(self, e, short_to_chan_info, chan, channel_id); + let (close_channel, res) = convert_chan_err!(self, e, chan, channel_id); handle_errors.push((chan.get_counterparty_node_id(), Err(res))); // ChannelClosed event is generated by handle_error for us !close_channel @@ -5305,7 +5458,6 @@ impl ChannelMana let mut channel_state_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_state_lock; let by_id = &mut channel_state.by_id; - let short_to_chan_info = &mut channel_state.short_to_chan_info; let pending_msg_events = &mut channel_state.pending_msg_events; by_id.retain(|channel_id, chan| { @@ -5330,13 +5482,13 @@ impl ChannelMana log_info!(self.logger, "Broadcasting {}", log_tx!(tx)); self.tx_broadcaster.broadcast_transaction(&tx); - update_maps_on_chan_removal!(self, short_to_chan_info, chan); + update_maps_on_chan_removal!(self, chan); false } else { true } }, Err(e) => { has_update = true; - let (close_channel, res) = convert_chan_err!(self, e, short_to_chan_info, chan, channel_id); + let (close_channel, res) = convert_chan_err!(self, e, chan, channel_id); handle_errors.push((chan.get_counterparty_node_id(), Err(res))); !close_channel } @@ -5526,14 +5678,14 @@ impl ChannelMana /// /// [phantom node payments]: crate::chain::keysinterface::PhantomKeysManager pub fn get_phantom_scid(&self) -> u64 { - let mut channel_state = self.channel_state.lock().unwrap(); - let best_block = self.best_block.read().unwrap(); + let best_block_height = self.best_block.read().unwrap().height(); + let short_to_chan_info = self.short_to_chan_info.read().unwrap(); loop { - let scid_candidate = fake_scid::Namespace::Phantom.get_fake_scid(best_block.height(), &self.genesis_hash, &self.fake_scid_rand_bytes, &self.keys_manager); + let scid_candidate = fake_scid::Namespace::Phantom.get_fake_scid(best_block_height, &self.genesis_hash, &self.fake_scid_rand_bytes, &self.keys_manager); // Ensure the generated scid doesn't conflict with a real channel. - match channel_state.short_to_chan_info.entry(scid_candidate) { - hash_map::Entry::Occupied(_) => continue, - hash_map::Entry::Vacant(_) => return scid_candidate + match short_to_chan_info.get(&scid_candidate) { + Some(_) => continue, + None => return scid_candidate } } } @@ -5568,10 +5720,10 @@ impl ChannelMana } } -impl MessageSendEventsProvider for ChannelManager - where M::Target: chain::Watch, +impl MessageSendEventsProvider for ChannelManager + where M::Target: chain::Watch<::Signer>, T::Target: BroadcasterInterface, - K::Target: KeysInterface, + K::Target: KeysInterface, F::Target: FeeEstimator, L::Target: Logger, { @@ -5607,11 +5759,11 @@ impl MessageSend } } -impl EventsProvider for ChannelManager +impl EventsProvider for ChannelManager where - M::Target: chain::Watch, + M::Target: chain::Watch<::Signer>, T::Target: BroadcasterInterface, - K::Target: KeysInterface, + K::Target: KeysInterface, F::Target: FeeEstimator, L::Target: Logger, { @@ -5643,11 +5795,11 @@ where } } -impl chain::Listen for ChannelManager +impl chain::Listen for ChannelManager where - M::Target: chain::Watch, + M::Target: chain::Watch<::Signer>, T::Target: BroadcasterInterface, - K::Target: KeysInterface, + K::Target: KeysInterface, F::Target: FeeEstimator, L::Target: Logger, { @@ -5680,11 +5832,11 @@ where } } -impl chain::Confirm for ChannelManager +impl chain::Confirm for ChannelManager where - M::Target: chain::Watch, + M::Target: chain::Watch<::Signer>, T::Target: BroadcasterInterface, - K::Target: KeysInterface, + K::Target: KeysInterface, F::Target: FeeEstimator, L::Target: Logger, { @@ -5742,26 +5894,11 @@ where payment_secrets.retain(|_, inbound_payment| { inbound_payment.expiry_time > header.time as u64 }); - - let mut outbounds = self.pending_outbound_payments.lock().unwrap(); - let mut pending_events = self.pending_events.lock().unwrap(); - outbounds.retain(|payment_id, payment| { - if payment.remaining_parts() != 0 { return true } - if let PendingOutboundPayment::Retryable { starting_block_height, payment_hash, .. } = payment { - if *starting_block_height + PAYMENT_EXPIRY_BLOCKS <= height { - log_info!(self.logger, "Timing out payment with id {} and hash {}", log_bytes!(payment_id.0), log_bytes!(payment_hash.0)); - pending_events.push(events::Event::PaymentFailed { - payment_id: *payment_id, payment_hash: *payment_hash, - }); - false - } else { true } - } else { true } - }); } fn get_relevant_txids(&self) -> Vec { let channel_state = self.channel_state.lock().unwrap(); - let mut res = Vec::with_capacity(channel_state.short_to_chan_info.len()); + let mut res = Vec::with_capacity(channel_state.by_id.len()); for chan in channel_state.by_id.values() { if let Some(funding_txo) = chan.get_funding_txo() { res.push(funding_txo.txid); @@ -5782,18 +5919,18 @@ where } } -impl ChannelManager +impl ChannelManager where - M::Target: chain::Watch, + M::Target: chain::Watch<::Signer>, T::Target: BroadcasterInterface, - K::Target: KeysInterface, + K::Target: KeysInterface, F::Target: FeeEstimator, L::Target: Logger, { /// Calls a function which handles an on-chain event (blocks dis/connected, transactions /// un/confirmed, etc) on each channel, handling any resulting errors or messages generated by /// the function. - fn do_chain_event) -> Result<(Option, Vec<(HTLCSource, PaymentHash)>, Option), ClosureReason>> + fn do_chain_event::Signer>) -> Result<(Option, Vec<(HTLCSource, PaymentHash)>, Option), ClosureReason>> (&self, height_opt: Option, f: FN) { // Note that we MUST NOT end up calling methods on self.chain_monitor here - we're called // during initialization prior to the chain_monitor being fully configured in some cases. @@ -5804,7 +5941,6 @@ where { let mut channel_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_lock; - let short_to_chan_info = &mut channel_state.short_to_chan_info; let pending_msg_events = &mut channel_state.pending_msg_events; channel_state.by_id.retain(|_, channel| { let res = f(channel); @@ -5816,7 +5952,7 @@ where }, HTLCDestination::NextHopChannel { node_id: Some(channel.get_counterparty_node_id()), channel_id: channel.channel_id() })); } if let Some(channel_ready) = channel_ready_opt { - send_channel_ready!(short_to_chan_info, pending_msg_events, channel, channel_ready); + send_channel_ready!(self, pending_msg_events, channel, channel_ready); if channel.is_usable() { log_trace!(self.logger, "Sending channel_ready with private initial channel_update for our counterparty on channel {}", log_bytes!(channel.channel_id())); if let Ok(msg) = self.get_channel_update_for_unicast(channel) { @@ -5829,6 +5965,9 @@ where log_trace!(self.logger, "Sending channel_ready WITHOUT channel_update for {}", log_bytes!(channel.channel_id())); } } + + emit_channel_ready_event!(self, channel); + if let Some(announcement_sigs) = announcement_sigs { log_trace!(self.logger, "Sending announcement_signatures for channel {}", log_bytes!(channel.channel_id())); pending_msg_events.push(events::MessageSendEvent::SendAnnouncementSignatures { @@ -5854,6 +5993,7 @@ where // enforce option_scid_alias then), and if the funding tx is ever // un-confirmed we force-close the channel, ensuring short_to_chan_info // is always consistent. + let mut short_to_chan_info = self.short_to_chan_info.write().unwrap(); let scid_insert = short_to_chan_info.insert(real_scid, (channel.get_counterparty_node_id(), channel.channel_id())); assert!(scid_insert.is_none() || scid_insert.unwrap() == (channel.get_counterparty_node_id(), channel.channel_id()), "SCIDs should never collide - ensure you weren't behind by a full {} blocks when creating channels", @@ -5861,7 +6001,7 @@ where } } } else if let Err(reason) = res { - update_maps_on_chan_removal!(self, short_to_chan_info, channel); + update_maps_on_chan_removal!(self, channel); // It looks like our counterparty went on-chain or funding transaction was // reorged out of the main chain. Close the channel. failed_channels.push(channel.force_shutdown(true)); @@ -5883,28 +6023,28 @@ where } true }); + } - if let Some(height) = height_opt { - channel_state.claimable_htlcs.retain(|payment_hash, (_, htlcs)| { - htlcs.retain(|htlc| { - // If height is approaching the number of blocks we think it takes us to get - // our commitment transaction confirmed before the HTLC expires, plus the - // number of blocks we generally consider it to take to do a commitment update, - // just give up on it and fail the HTLC. - if height >= htlc.cltv_expiry - HTLC_FAIL_BACK_BUFFER { - let mut htlc_msat_height_data = byte_utils::be64_to_array(htlc.value).to_vec(); - htlc_msat_height_data.extend_from_slice(&byte_utils::be32_to_array(height)); - - timed_out_htlcs.push((HTLCSource::PreviousHopData(htlc.prev_hop.clone()), payment_hash.clone(), HTLCFailReason::Reason { - failure_code: 0x4000 | 15, - data: htlc_msat_height_data - }, HTLCDestination::FailedPayment { payment_hash: payment_hash.clone() })); - false - } else { true } - }); - !htlcs.is_empty() // Only retain this entry if htlcs has at least one entry. + if let Some(height) = height_opt { + self.claimable_htlcs.lock().unwrap().retain(|payment_hash, (_, htlcs)| { + htlcs.retain(|htlc| { + // If height is approaching the number of blocks we think it takes us to get + // our commitment transaction confirmed before the HTLC expires, plus the + // number of blocks we generally consider it to take to do a commitment update, + // just give up on it and fail the HTLC. + if height >= htlc.cltv_expiry - HTLC_FAIL_BACK_BUFFER { + let mut htlc_msat_height_data = byte_utils::be64_to_array(htlc.value).to_vec(); + htlc_msat_height_data.extend_from_slice(&byte_utils::be32_to_array(height)); + + timed_out_htlcs.push((HTLCSource::PreviousHopData(htlc.prev_hop.clone()), payment_hash.clone(), HTLCFailReason::Reason { + failure_code: 0x4000 | 15, + data: htlc_msat_height_data + }, HTLCDestination::FailedPayment { payment_hash: payment_hash.clone() })); + false + } else { true } }); - } + !htlcs.is_empty() // Only retain this entry if htlcs has at least one entry. + }); } self.handle_init_event_channel_failures(failed_channels); @@ -5951,11 +6091,11 @@ where } } -impl - ChannelMessageHandler for ChannelManager - where M::Target: chain::Watch, +impl + ChannelMessageHandler for ChannelManager + where M::Target: chain::Watch<::Signer>, T::Target: BroadcasterInterface, - K::Target: KeysInterface, + K::Target: KeysInterface, F::Target: FeeEstimator, L::Target: Logger, { @@ -6057,14 +6197,13 @@ impl let mut channel_state_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_state_lock; let pending_msg_events = &mut channel_state.pending_msg_events; - let short_to_chan_info = &mut channel_state.short_to_chan_info; log_debug!(self.logger, "Marking channels with {} disconnected and generating channel_updates. We believe we {} make future connections to this peer.", log_pubkey!(counterparty_node_id), if no_connection_possible { "cannot" } else { "can" }); channel_state.by_id.retain(|_, chan| { if chan.get_counterparty_node_id() == *counterparty_node_id { chan.remove_uncommitted_htlcs_and_mark_paused(&self.logger); if chan.is_shutdown() { - update_maps_on_chan_removal!(self, short_to_chan_info, chan); + update_maps_on_chan_removal!(self, chan); self.issue_channel_close_events(chan, ClosureReason::DisconnectedPeer); return false; } else { @@ -6428,7 +6567,7 @@ impl Writeable for ClaimableHTLC { impl Readable for ClaimableHTLC { fn read(reader: &mut R) -> Result { - let mut prev_hop = ::util::ser::OptionDeserWrapper(None); + let mut prev_hop = crate::util::ser::OptionDeserWrapper(None); let mut value = 0; let mut payment_data: Option = None; let mut cltv_expiry = 0; @@ -6478,7 +6617,7 @@ impl Readable for HTLCSource { let id: u8 = Readable::read(reader)?; match id { 0 => { - let mut session_priv: ::util::ser::OptionDeserWrapper = ::util::ser::OptionDeserWrapper(None); + let mut session_priv: crate::util::ser::OptionDeserWrapper = crate::util::ser::OptionDeserWrapper(None); let mut first_hop_htlc_msat: u64 = 0; let mut path = Some(Vec::new()); let mut payment_id = None; @@ -6513,7 +6652,7 @@ impl Readable for HTLCSource { } impl Writeable for HTLCSource { - fn write(&self, writer: &mut W) -> Result<(), ::io::Error> { + fn write(&self, writer: &mut W) -> Result<(), crate::io::Error> { match self { HTLCSource::OutboundRoute { ref session_priv, ref first_hop_htlc_msat, ref path, payment_id, payment_secret, payment_params } => { 0u8.write(writer)?; @@ -6574,6 +6713,7 @@ impl_writeable_tlv_based_enum_upgradable!(PendingOutboundPayment, (1, Fulfilled) => { (0, session_privs, required), (1, payment_hash, option), + (3, timer_ticks_without_htlcs, (default_value, 0)), }, (2, Retryable) => { (0, session_privs, required), @@ -6590,10 +6730,10 @@ impl_writeable_tlv_based_enum_upgradable!(PendingOutboundPayment, }, ); -impl Writeable for ChannelManager - where M::Target: chain::Watch, +impl Writeable for ChannelManager + where M::Target: chain::Watch<::Signer>, T::Target: BroadcasterInterface, - K::Target: KeysInterface, + K::Target: KeysInterface, F::Target: FeeEstimator, L::Target: Logger, { @@ -6639,10 +6779,13 @@ impl Writeable f } } - let channel_state = self.channel_state.lock().unwrap(); + let pending_inbound_payments = self.pending_inbound_payments.lock().unwrap(); + let claimable_htlcs = self.claimable_htlcs.lock().unwrap(); + let pending_outbound_payments = self.pending_outbound_payments.lock().unwrap(); + let mut htlc_purposes: Vec<&events::PaymentPurpose> = Vec::new(); - (channel_state.claimable_htlcs.len() as u64).write(writer)?; - for (payment_hash, (purpose, previous_hops)) in channel_state.claimable_htlcs.iter() { + (claimable_htlcs.len() as u64).write(writer)?; + for (payment_hash, (purpose, previous_hops)) in claimable_htlcs.iter() { payment_hash.write(writer)?; (previous_hops.len() as u64).write(writer)?; for htlc in previous_hops.iter() { @@ -6659,8 +6802,6 @@ impl Writeable f peer_state.latest_features.write(writer)?; } - let pending_inbound_payments = self.pending_inbound_payments.lock().unwrap(); - let pending_outbound_payments = self.pending_outbound_payments.lock().unwrap(); let events = self.pending_events.lock().unwrap(); (events.len() as u64).write(writer)?; for event in events.iter() { @@ -6839,29 +6980,29 @@ impl<'a, Signer: 'a + Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> // Implement ReadableArgs for an Arc'd ChannelManager to make it a bit easier to work with the // SipmleArcChannelManager type: -impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> - ReadableArgs> for (BlockHash, Arc>) - where M::Target: chain::Watch, +impl<'a, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> + ReadableArgs::Signer, M, T, K, F, L>> for (BlockHash, Arc>) + where M::Target: chain::Watch<::Signer>, T::Target: BroadcasterInterface, - K::Target: KeysInterface, + K::Target: KeysInterface, F::Target: FeeEstimator, L::Target: Logger, { - fn read(reader: &mut R, args: ChannelManagerReadArgs<'a, Signer, M, T, K, F, L>) -> Result { - let (blockhash, chan_manager) = <(BlockHash, ChannelManager)>::read(reader, args)?; + fn read(reader: &mut R, args: ChannelManagerReadArgs<'a, ::Signer, M, T, K, F, L>) -> Result { + let (blockhash, chan_manager) = <(BlockHash, ChannelManager)>::read(reader, args)?; Ok((blockhash, Arc::new(chan_manager))) } } -impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> - ReadableArgs> for (BlockHash, ChannelManager) - where M::Target: chain::Watch, +impl<'a, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> + ReadableArgs::Signer, M, T, K, F, L>> for (BlockHash, ChannelManager) + where M::Target: chain::Watch<::Signer>, T::Target: BroadcasterInterface, - K::Target: KeysInterface, + K::Target: KeysInterface, F::Target: FeeEstimator, L::Target: Logger, { - fn read(reader: &mut R, mut args: ChannelManagerReadArgs<'a, Signer, M, T, K, F, L>) -> Result { + fn read(reader: &mut R, mut args: ChannelManagerReadArgs<'a, ::Signer, M, T, K, F, L>) -> Result { let _ver = read_ver_prefix!(reader, SERIALIZATION_VERSION); let genesis_hash: BlockHash = Readable::read(reader)?; @@ -6877,7 +7018,7 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> let mut short_to_chan_info = HashMap::with_capacity(cmp::min(channel_count as usize, 128)); let mut channel_closures = Vec::new(); for _ in 0..channel_count { - let mut channel: Channel = Channel::read(reader, (&args.keys_manager, best_block_height))?; + let mut channel: Channel<::Signer> = Channel::read(reader, (&args.keys_manager, best_block_height))?; let funding_txo = channel.get_funding_txo().ok_or(DecodeError::InvalidValue)?; funding_txo_set.insert(funding_txo.clone()); if let Some(ref mut monitor) = args.channel_monitors.get_mut(&funding_txo) { @@ -7253,8 +7394,6 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> channel_state: Mutex::new(ChannelHolder { by_id, - short_to_chan_info, - claimable_htlcs, pending_msg_events: Vec::new(), }), inbound_payment_key: expanded_inbound_key, @@ -7262,8 +7401,10 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> pending_outbound_payments: Mutex::new(pending_outbound_payments.unwrap()), forward_htlcs: Mutex::new(forward_htlcs), + claimable_htlcs: Mutex::new(claimable_htlcs), outbound_scid_aliases: Mutex::new(outbound_scid_aliases), id_to_peer: Mutex::new(id_to_peer), + short_to_chan_info: FairRwLock::new(short_to_chan_info), fake_scid_rand_bytes: fake_scid_rand_bytes.unwrap(), probing_cookie_secret: probing_cookie_secret.unwrap(), @@ -7305,16 +7446,16 @@ mod tests { use bitcoin::hashes::sha256::Hash as Sha256; use core::time::Duration; use core::sync::atomic::Ordering; - use ln::{PaymentPreimage, PaymentHash, PaymentSecret}; - use ln::channelmanager::{self, inbound_payment, PaymentId, PaymentSendFailure}; - use ln::functional_test_utils::*; - use ln::msgs; - use ln::msgs::ChannelMessageHandler; - use routing::router::{PaymentParameters, RouteParameters, find_route}; - use util::errors::APIError; - use util::events::{Event, HTLCDestination, MessageSendEvent, MessageSendEventsProvider, ClosureReason}; - use util::test_utils; - use chain::keysinterface::KeysInterface; + use crate::ln::{PaymentPreimage, PaymentHash, PaymentSecret}; + use crate::ln::channelmanager::{self, inbound_payment, PaymentId, PaymentSendFailure}; + use crate::ln::functional_test_utils::*; + use crate::ln::msgs; + use crate::ln::msgs::ChannelMessageHandler; + use crate::routing::router::{PaymentParameters, RouteParameters, find_route}; + use crate::util::errors::APIError; + use crate::util::events::{Event, HTLCDestination, MessageSendEvent, MessageSendEventsProvider, ClosureReason}; + use crate::util::test_utils; + use crate::chain::keysinterface::KeysInterface; #[test] fn test_notify_limits() { @@ -7406,18 +7547,22 @@ mod tests { // First, send a partial MPP payment. let (route, our_payment_hash, payment_preimage, payment_secret) = get_route_and_payment_hash!(&nodes[0], nodes[1], 100_000); + let mut mpp_route = route.clone(); + mpp_route.paths.push(mpp_route.paths[0].clone()); + let payment_id = PaymentId([42; 32]); // Use the utility function send_payment_along_path to send the payment with MPP data which // indicates there are more HTLCs coming. let cur_height = CHAN_CONFIRM_DEPTH + 1; // route_payment calls send_payment, which adds 1 to the current height. So we do the same here to match. - nodes[0].node.send_payment_along_path(&route.paths[0], &route.payment_params, &our_payment_hash, &Some(payment_secret), 200_000, cur_height, payment_id, &None).unwrap(); + let session_privs = nodes[0].node.add_new_pending_payment(our_payment_hash, Some(payment_secret), payment_id, &mpp_route).unwrap(); + nodes[0].node.send_payment_along_path(&mpp_route.paths[0], &route.payment_params, &our_payment_hash, &Some(payment_secret), 200_000, cur_height, payment_id, &None, session_privs[0]).unwrap(); check_added_monitors!(nodes[0], 1); let mut events = nodes[0].node.get_and_clear_pending_msg_events(); assert_eq!(events.len(), 1); pass_along_path(&nodes[0], &[&nodes[1]], 200_000, our_payment_hash, Some(payment_secret), events.drain(..).next().unwrap(), false, None); // Next, send a keysend payment with the same payment_hash and make sure it fails. - nodes[0].node.send_spontaneous_payment(&route, Some(payment_preimage)).unwrap(); + nodes[0].node.send_spontaneous_payment(&route, Some(payment_preimage), PaymentId(payment_preimage.0)).unwrap(); check_added_monitors!(nodes[0], 1); let mut events = nodes[0].node.get_and_clear_pending_msg_events(); assert_eq!(events.len(), 1); @@ -7440,7 +7585,7 @@ mod tests { expect_payment_failed!(nodes[0], our_payment_hash, true); // Send the second half of the original MPP payment. - nodes[0].node.send_payment_along_path(&route.paths[0], &route.payment_params, &our_payment_hash, &Some(payment_secret), 200_000, cur_height, payment_id, &None).unwrap(); + nodes[0].node.send_payment_along_path(&mpp_route.paths[1], &route.payment_params, &our_payment_hash, &Some(payment_secret), 200_000, cur_height, payment_id, &None, session_privs[1]).unwrap(); check_added_monitors!(nodes[0], 1); let mut events = nodes[0].node.get_and_clear_pending_msg_events(); assert_eq!(events.len(), 1); @@ -7538,7 +7683,7 @@ mod tests { &nodes[0].node.get_our_node_id(), &route_params, &nodes[0].network_graph, None, nodes[0].logger, &scorer, &random_seed_bytes ).unwrap(); - nodes[0].node.send_spontaneous_payment(&route, Some(payment_preimage)).unwrap(); + nodes[0].node.send_spontaneous_payment(&route, Some(payment_preimage), PaymentId(payment_preimage.0)).unwrap(); check_added_monitors!(nodes[0], 1); let mut events = nodes[0].node.get_and_clear_pending_msg_events(); assert_eq!(events.len(), 1); @@ -7571,7 +7716,7 @@ mod tests { &nodes[0].node.get_our_node_id(), &route_params, &nodes[0].network_graph, None, nodes[0].logger, &scorer, &random_seed_bytes ).unwrap(); - let (payment_hash, _) = nodes[0].node.send_spontaneous_payment(&route, Some(payment_preimage)).unwrap(); + let payment_hash = nodes[0].node.send_spontaneous_payment(&route, Some(payment_preimage), PaymentId(payment_preimage.0)).unwrap(); check_added_monitors!(nodes[0], 1); let mut events = nodes[0].node.get_and_clear_pending_msg_events(); assert_eq!(events.len(), 1); @@ -7581,7 +7726,7 @@ mod tests { // Next, attempt a regular payment and make sure it fails. let payment_secret = PaymentSecret([43; 32]); - nodes[0].node.send_payment(&route, payment_hash, &Some(payment_secret)).unwrap(); + nodes[0].node.send_payment(&route, payment_hash, &Some(payment_secret), PaymentId(payment_hash.0)).unwrap(); check_added_monitors!(nodes[0], 1); let mut events = nodes[0].node.get_and_clear_pending_msg_events(); assert_eq!(events.len(), 1); @@ -7624,7 +7769,7 @@ mod tests { let _chan = create_chan_between_nodes(&nodes[0], &nodes[1], channelmanager::provided_init_features(), channelmanager::provided_init_features()); let route_params = RouteParameters { payment_params: PaymentParameters::for_keysend(payee_pubkey), - final_value_msat: 10000, + final_value_msat: 10_000, final_cltv_expiry_delta: 40, }; let network_graph = nodes[0].network_graph; @@ -7638,7 +7783,8 @@ mod tests { let test_preimage = PaymentPreimage([42; 32]); let mismatch_payment_hash = PaymentHash([43; 32]); - let _ = nodes[0].node.send_payment_internal(&route, mismatch_payment_hash, &None, Some(test_preimage), None, None).unwrap(); + let session_privs = nodes[0].node.add_new_pending_payment(mismatch_payment_hash, None, PaymentId(mismatch_payment_hash.0), &route).unwrap(); + nodes[0].node.send_payment_internal(&route, mismatch_payment_hash, &None, Some(test_preimage), PaymentId(mismatch_payment_hash.0), None, session_privs).unwrap(); check_added_monitors!(nodes[0], 1); let updates = get_htlc_update_msgs!(nodes[0], nodes[1].node.get_our_node_id()); @@ -7668,7 +7814,7 @@ mod tests { let _chan = create_chan_between_nodes(&nodes[0], &nodes[1], channelmanager::provided_init_features(), channelmanager::provided_init_features()); let route_params = RouteParameters { payment_params: PaymentParameters::for_keysend(payee_pubkey), - final_value_msat: 10000, + final_value_msat: 10_000, final_cltv_expiry_delta: 40, }; let network_graph = nodes[0].network_graph; @@ -7683,7 +7829,8 @@ mod tests { let test_preimage = PaymentPreimage([42; 32]); let test_secret = PaymentSecret([43; 32]); let payment_hash = PaymentHash(Sha256::hash(&test_preimage.0).into_inner()); - let _ = nodes[0].node.send_payment_internal(&route, payment_hash, &Some(test_secret), Some(test_preimage), None, None).unwrap(); + let session_privs = nodes[0].node.add_new_pending_payment(payment_hash, Some(test_secret), PaymentId(payment_hash.0), &route).unwrap(); + nodes[0].node.send_payment_internal(&route, payment_hash, &Some(test_secret), Some(test_preimage), PaymentId(payment_hash.0), None, session_privs).unwrap(); check_added_monitors!(nodes[0], 1); let updates = get_htlc_update_msgs!(nodes[0], nodes[1].node.get_our_node_id()); @@ -7720,7 +7867,7 @@ mod tests { route.paths[1][0].short_channel_id = chan_2_id; route.paths[1][1].short_channel_id = chan_4_id; - match nodes[0].node.send_payment(&route, payment_hash, &None).unwrap_err() { + match nodes[0].node.send_payment(&route, payment_hash, &None, PaymentId(payment_hash.0)).unwrap_err() { PaymentSendFailure::ParameterError(APIError::APIMisuseError { ref err }) => { assert!(regex::Regex::new(r"Payment secret is required for multi-path payments").unwrap().is_match(err)) }, _ => panic!("unexpected error") @@ -7872,34 +8019,33 @@ mod tests { #[cfg(all(any(test, feature = "_test_utils"), feature = "_bench_unstable"))] pub mod bench { - use chain::Listen; - use chain::chainmonitor::{ChainMonitor, Persist}; - use chain::keysinterface::{KeysManager, KeysInterface, InMemorySigner}; - use ln::channelmanager::{self, BestBlock, ChainParameters, ChannelManager, PaymentHash, PaymentPreimage}; - use ln::features::{InitFeatures, InvoiceFeatures}; - use ln::functional_test_utils::*; - use ln::msgs::{ChannelMessageHandler, Init}; - use routing::gossip::NetworkGraph; - use routing::router::{PaymentParameters, get_route}; - use util::test_utils; - use util::config::UserConfig; - use util::events::{Event, MessageSendEvent, MessageSendEventsProvider}; + use crate::chain::Listen; + use crate::chain::chainmonitor::{ChainMonitor, Persist}; + use crate::chain::keysinterface::{KeysManager, KeysInterface, InMemorySigner}; + use crate::ln::channelmanager::{self, BestBlock, ChainParameters, ChannelManager, PaymentHash, PaymentPreimage, PaymentId}; + use crate::ln::functional_test_utils::*; + use crate::ln::msgs::{ChannelMessageHandler, Init}; + use crate::routing::gossip::NetworkGraph; + use crate::routing::router::{PaymentParameters, get_route}; + use crate::util::test_utils; + use crate::util::config::UserConfig; + use crate::util::events::{Event, MessageSendEvent, MessageSendEventsProvider}; use bitcoin::hashes::Hash; use bitcoin::hashes::sha256::Hash as Sha256; use bitcoin::{Block, BlockHeader, PackedLockTime, Transaction, TxMerkleNode, TxOut}; - use sync::{Arc, Mutex}; + use crate::sync::{Arc, Mutex}; use test::Bencher; struct NodeHolder<'a, P: Persist> { - node: &'a ChannelManager, &'a test_utils::TestBroadcaster, &'a KeysManager, - &'a test_utils::TestFeeEstimator, &'a test_utils::TestLogger> + &'a test_utils::TestFeeEstimator, &'a test_utils::TestLogger>, } #[cfg(test)] @@ -7982,6 +8128,24 @@ pub mod bench { _ => panic!(), } + let events_a = node_a.get_and_clear_pending_events(); + assert_eq!(events_a.len(), 1); + match events_a[0] { + Event::ChannelReady{ ref counterparty_node_id, .. } => { + assert_eq!(*counterparty_node_id, node_b.get_our_node_id()); + }, + _ => panic!("Unexpected event"), + } + + let events_b = node_b.get_and_clear_pending_events(); + assert_eq!(events_b.len(), 1); + match events_b[0] { + Event::ChannelReady{ ref counterparty_node_id, .. } => { + assert_eq!(*counterparty_node_id, node_a.get_our_node_id()); + }, + _ => panic!("Unexpected event"), + } + let dummy_graph = NetworkGraph::new(genesis_hash, &logger_a); let mut payment_count: u64 = 0; @@ -8003,7 +8167,7 @@ pub mod bench { let payment_hash = PaymentHash(Sha256::hash(&payment_preimage.0[..]).into_inner()); let payment_secret = $node_b.create_inbound_payment_for_hash(payment_hash, None, 7200).unwrap(); - $node_a.send_payment(&route, payment_hash, &Some(payment_secret)).unwrap(); + $node_a.send_payment(&route, payment_hash, &Some(payment_secret), PaymentId(payment_hash.0)).unwrap(); let payment_event = SendEvent::from_event($node_a.get_and_clear_pending_msg_events().pop().unwrap()); $node_b.handle_update_add_htlc(&$node_a.get_our_node_id(), &payment_event.msgs[0]); $node_b.handle_commitment_signed(&$node_a.get_our_node_id(), &payment_event.commitment_msg);