X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fln%2Fchannelmanager.rs;h=b22dacdd686d1ceca2511c8b44046c14fa7e787f;hb=8a51a792aa0967503c317531aef5ad336dc07d41;hp=9e087785b537985714ddfee30e9da54e76df36cb;hpb=b3d257d70f2c02d5e4c455507a30838672862658;p=rust-lightning diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 9e087785..b22dacdd 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -46,14 +46,14 @@ use crate::ln::channel::{Channel, ChannelError, ChannelUpdateStatus, UpdateFulfi use crate::ln::features::{ChannelFeatures, ChannelTypeFeatures, InitFeatures, NodeFeatures}; #[cfg(any(feature = "_test_utils", test))] use crate::ln::features::InvoiceFeatures; -use crate::routing::router::{PaymentParameters, Route, RouteHop, RoutePath, RouteParameters}; +use crate::routing::router::{InFlightHtlcs, PaymentParameters, Route, RouteHop, RoutePath, RouteParameters}; use crate::ln::msgs; use crate::ln::onion_utils; use crate::ln::msgs::{ChannelMessageHandler, DecodeError, LightningError, MAX_VALUE_MSAT}; use crate::ln::wire::Encode; use crate::chain::keysinterface::{Sign, KeysInterface, KeysManager, Recipient}; use crate::util::config::{UserConfig, ChannelConfig}; -use crate::util::events::{EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider, ClosureReason, HTLCDestination}; +use crate::util::events::{Event, EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider, ClosureReason, HTLCDestination}; use crate::util::{byte_utils, events}; use crate::util::wakers::{Future, Notifier}; use crate::util::scid_utils::fake_scid; @@ -112,7 +112,8 @@ pub(super) struct PendingHTLCInfo { pub(super) routing: PendingHTLCRouting, pub(super) incoming_shared_secret: [u8; 32], payment_hash: PaymentHash, - pub(super) amt_to_forward: u64, + pub(super) incoming_amt_msat: Option, // Added in 0.0.113 + pub(super) outgoing_amt_msat: u64, pub(super) outgoing_cltv_value: u32, } @@ -291,7 +292,7 @@ type ShutdownResult = (Option<(OutPoint, ChannelMonitorUpdate)>, Vec<(HTLCSource struct MsgHandleErrInternal { err: msgs::LightningError, - chan_id: Option<([u8; 32], u64)>, // If Some a channel of ours has been closed + chan_id: Option<([u8; 32], u128)>, // If Some a channel of ours has been closed shutdown_finish: Option<(ShutdownResult, Option)>, } impl MsgHandleErrInternal { @@ -327,7 +328,7 @@ impl MsgHandleErrInternal { Self { err, chan_id: None, shutdown_finish: None } } #[inline] - fn from_finish_shutdown(err: String, channel_id: [u8; 32], user_channel_id: u64, shutdown_res: ShutdownResult, channel_update: Option) -> Self { + fn from_finish_shutdown(err: String, channel_id: [u8; 32], user_channel_id: u128, shutdown_res: ShutdownResult, channel_update: Option) -> Self { Self { err: LightningError { err: err.clone(), @@ -397,13 +398,6 @@ pub(super) enum RAACommitmentOrder { // Note this is only exposed in cfg(test): pub(super) struct ChannelHolder { pub(super) by_id: HashMap<[u8; 32], Channel>, - /// Map from payment hash to the payment data and any HTLCs which are to us and can be - /// failed/claimed by the user. - /// - /// Note that while this is held in the same mutex as the channels themselves, no consistency - /// guarantees are made about the channels given here actually existing anymore by the time you - /// go to read them! - claimable_htlcs: HashMap)>, /// Messages to send to peers - pushed to in the same lock that they are generated in (except /// for broadcast messages, where ordering isn't as strict). pub(super) pending_msg_events: Vec, @@ -672,19 +666,21 @@ pub type SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, M, T, F, L> = ChannelManage // | // |__`forward_htlcs` // | -// |__`channel_state` +// |__`pending_inbound_payments` // | | -// | |__`id_to_peer` +// | |__`claimable_htlcs` // | | -// | |__`short_to_chan_info` -// | | -// | |__`per_peer_state` -// | | -// | |__`outbound_scid_aliases` +// | |__`pending_outbound_payments` // | | -// | |__`pending_inbound_payments` +// | |__`channel_state` +// | | +// | |__`id_to_peer` // | | -// | |__`pending_outbound_payments` +// | |__`short_to_chan_info` +// | | +// | |__`per_peer_state` +// | | +// | |__`outbound_scid_aliases` // | | // | |__`best_block` // | | @@ -755,6 +751,15 @@ pub struct ChannelManager #[cfg(not(test))] forward_htlcs: Mutex>>, + /// Map from payment hash to the payment data and any HTLCs which are to us and can be + /// failed/claimed by the user. + /// + /// Note that, no consistency guarantees are made about the channels given here actually + /// existing anymore by the time you go to read them! + /// + /// See `ChannelManager` struct-level documentation for lock order requirements. + claimable_htlcs: Mutex)>>, + /// The set of outbound SCID aliases across all our channels, including unconfirmed channels /// and some closed channels which reached a usable state prior to being closed. This is used /// only to avoid duplicates, and is not persisted explicitly to disk, but rebuilt from the @@ -1085,8 +1090,10 @@ pub struct ChannelDetails { /// /// [`outbound_capacity_msat`]: ChannelDetails::outbound_capacity_msat pub unspendable_punishment_reserve: Option, - /// The `user_channel_id` passed in to create_channel, or 0 if the channel was inbound. - pub user_channel_id: u64, + /// The `user_channel_id` passed in to create_channel, or a random value if the channel was + /// inbound. This may be zero for inbound channels serialized with LDK versions prior to + /// 0.0.113. + pub user_channel_id: u128, /// Our total balance. This is the amount we would get if we close the channel. /// This value is not exact. Due to various in-flight changes and feerate changes, exactly this /// amount is not likely to be recoverable on close. @@ -1202,24 +1209,40 @@ impl ChannelDetails { #[derive(Clone, Debug)] pub enum PaymentSendFailure { /// A parameter which was passed to send_payment was invalid, preventing us from attempting to - /// send the payment at all. No channel state has been changed or messages sent to peers, and - /// once you've changed the parameter at error, you can freely retry the payment in full. + /// send the payment at all. + /// + /// You can freely resend the payment in full (with the parameter error fixed). + /// + /// Because the payment failed outright, no payment tracking is done, you do not need to call + /// [`ChannelManager::abandon_payment`] and [`ChannelManager::retry_payment`] will *not* work + /// for this payment. ParameterError(APIError), /// A parameter in a single path which was passed to send_payment was invalid, preventing us - /// from attempting to send the payment at all. No channel state has been changed or messages - /// sent to peers, and once you've changed the parameter at error, you can freely retry the - /// payment in full. + /// from attempting to send the payment at all. + /// + /// You can freely resend the payment in full (with the parameter error fixed). /// /// The results here are ordered the same as the paths in the route object which was passed to /// send_payment. + /// + /// Because the payment failed outright, no payment tracking is done, you do not need to call + /// [`ChannelManager::abandon_payment`] and [`ChannelManager::retry_payment`] will *not* work + /// for this payment. PathParameterError(Vec>), /// All paths which were attempted failed to send, with no channel state change taking place. - /// You can freely retry the payment in full (though you probably want to do so over different + /// You can freely resend the payment in full (though you probably want to do so over different /// paths than the ones selected). /// - /// [`ChannelManager::abandon_payment`] does *not* need to be called for this payment and - /// [`ChannelManager::retry_payment`] will *not* work for this payment. - AllFailedRetrySafe(Vec), + /// Because the payment failed outright, no payment tracking is done, you do not need to call + /// [`ChannelManager::abandon_payment`] and [`ChannelManager::retry_payment`] will *not* work + /// for this payment. + AllFailedResendSafe(Vec), + /// Indicates that a payment for the provided [`PaymentId`] is already in-flight and has not + /// yet completed (i.e. generated an [`Event::PaymentSent`]) or been abandoned (via + /// [`ChannelManager::abandon_payment`]). + /// + /// [`Event::PaymentSent`]: events::Event::PaymentSent + DuplicatePayment, /// Some paths which were attempted failed to send, though possibly not all. At least some /// paths have irrevocably committed to the HTLC and retrying the payment in full would result /// in over-/re-payment. @@ -1498,134 +1521,6 @@ macro_rules! emit_channel_ready_event { } } -macro_rules! handle_chan_restoration_locked { - ($self: ident, $channel_lock: expr, $channel_state: expr, $channel_entry: expr, - $raa: expr, $commitment_update: expr, $order: expr, $chanmon_update: expr, - $pending_forwards: expr, $funding_broadcastable: expr, $channel_ready: expr, $announcement_sigs: expr) => { { - let mut htlc_forwards = None; - - let chanmon_update: Option = $chanmon_update; // Force type-checking to resolve - let chanmon_update_is_none = chanmon_update.is_none(); - let counterparty_node_id = $channel_entry.get().get_counterparty_node_id(); - let res = loop { - let forwards: Vec<(PendingHTLCInfo, u64)> = $pending_forwards; // Force type-checking to resolve - if !forwards.is_empty() { - htlc_forwards = Some(($channel_entry.get().get_short_channel_id().unwrap_or($channel_entry.get().outbound_scid_alias()), - $channel_entry.get().get_funding_txo().unwrap(), forwards)); - } - - if chanmon_update.is_some() { - // On reconnect, we, by definition, only resend a channel_ready if there have been - // no commitment updates, so the only channel monitor update which could also be - // associated with a channel_ready would be the funding_created/funding_signed - // monitor update. That monitor update failing implies that we won't send - // channel_ready until it's been updated, so we can't have a channel_ready and a - // monitor update here (so we don't bother to handle it correctly below). - assert!($channel_ready.is_none()); - // A channel monitor update makes no sense without either a channel_ready or a - // commitment update to process after it. Since we can't have a channel_ready, we - // only bother to handle the monitor-update + commitment_update case below. - assert!($commitment_update.is_some()); - } - - if let Some(msg) = $channel_ready { - // Similar to the above, this implies that we're letting the channel_ready fly - // before it should be allowed to. - assert!(chanmon_update.is_none()); - send_channel_ready!($self, $channel_state.pending_msg_events, $channel_entry.get(), msg); - } - if let Some(msg) = $announcement_sigs { - $channel_state.pending_msg_events.push(events::MessageSendEvent::SendAnnouncementSignatures { - node_id: counterparty_node_id, - msg, - }); - } - - emit_channel_ready_event!($self, $channel_entry.get_mut()); - - let funding_broadcastable: Option = $funding_broadcastable; // Force type-checking to resolve - if let Some(monitor_update) = chanmon_update { - // We only ever broadcast a funding transaction in response to a funding_signed - // message and the resulting monitor update. Thus, on channel_reestablish - // message handling we can't have a funding transaction to broadcast. When - // processing a monitor update finishing resulting in a funding broadcast, we - // cannot have a second monitor update, thus this case would indicate a bug. - assert!(funding_broadcastable.is_none()); - // Given we were just reconnected or finished updating a channel monitor, the - // only case where we can get a new ChannelMonitorUpdate would be if we also - // have some commitment updates to send as well. - assert!($commitment_update.is_some()); - match $self.chain_monitor.update_channel($channel_entry.get().get_funding_txo().unwrap(), monitor_update) { - ChannelMonitorUpdateStatus::Completed => {}, - e => { - // channel_reestablish doesn't guarantee the order it returns is sensical - // for the messages it returns, but if we're setting what messages to - // re-transmit on monitor update success, we need to make sure it is sane. - let mut order = $order; - if $raa.is_none() { - order = RAACommitmentOrder::CommitmentFirst; - } - break handle_monitor_update_res!($self, e, $channel_entry, order, $raa.is_some(), true); - } - } - } - - macro_rules! handle_cs { () => { - if let Some(update) = $commitment_update { - $channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { - node_id: counterparty_node_id, - updates: update, - }); - } - } } - macro_rules! handle_raa { () => { - if let Some(revoke_and_ack) = $raa { - $channel_state.pending_msg_events.push(events::MessageSendEvent::SendRevokeAndACK { - node_id: counterparty_node_id, - msg: revoke_and_ack, - }); - } - } } - match $order { - RAACommitmentOrder::CommitmentFirst => { - handle_cs!(); - handle_raa!(); - }, - RAACommitmentOrder::RevokeAndACKFirst => { - handle_raa!(); - handle_cs!(); - }, - } - if let Some(tx) = funding_broadcastable { - log_info!($self.logger, "Broadcasting funding transaction with txid {}", tx.txid()); - $self.tx_broadcaster.broadcast_transaction(&tx); - } - break Ok(()); - }; - - if chanmon_update_is_none { - // If there was no ChannelMonitorUpdate, we should never generate an Err in the res loop - // above. Doing so would imply calling handle_err!() from channel_monitor_updated() which - // should *never* end up calling back to `chain_monitor.update_channel()`. - assert!(res.is_ok()); - } - - (htlc_forwards, res, counterparty_node_id) - } } -} - -macro_rules! post_handle_chan_restoration { - ($self: ident, $locked_res: expr) => { { - let (htlc_forwards, res, counterparty_node_id) = $locked_res; - - let _ = handle_error!($self, res, counterparty_node_id); - - if let Some(forwards) = htlc_forwards { - $self.forward_htlcs(&mut [forwards][..]); - } - } } -} - impl ChannelManager where M::Target: chain::Watch<::Signer>, T::Target: BroadcasterInterface, @@ -1659,13 +1554,13 @@ impl ChannelManager ChannelManager 2**24 or `push_msat` is /// greater than `channel_value_satoshis * 1k` or `channel_value_satoshis < 1000`. @@ -1743,7 +1637,7 @@ impl ChannelManager) -> Result<[u8; 32], APIError> { + pub fn create_channel(&self, their_network_key: PublicKey, channel_value_satoshis: u64, push_msat: u64, user_channel_id: u128, override_config: Option) -> Result<[u8; 32], APIError> { if channel_value_satoshis < 1000 { return Err(APIError::APIMisuseError { err: format!("Channel value must be at least 1000 satoshis. It was {}", channel_value_satoshis) }); } @@ -1904,14 +1798,16 @@ impl ChannelManager { - let peer_state = peer_state.lock().unwrap(); - let their_features = &peer_state.latest_features; - chan_entry.get_mut().get_shutdown(&self.keys_manager, their_features, target_feerate_sats_per_1000_weight)? - }, - None => return Err(APIError::ChannelUnavailable { err: format!("Not connected to node: {}", counterparty_node_id) }), + let (shutdown_msg, monitor_update, htlcs) = { + let per_peer_state = self.per_peer_state.read().unwrap(); + match per_peer_state.get(&counterparty_node_id) { + Some(peer_state) => { + let peer_state = peer_state.lock().unwrap(); + let their_features = &peer_state.latest_features; + chan_entry.get_mut().get_shutdown(&self.keys_manager, their_features, target_feerate_sats_per_1000_weight)? + }, + None => return Err(APIError::ChannelUnavailable { err: format!("Not connected to node: {}", counterparty_node_id) }), + } }; failed_htlcs = htlcs; @@ -2137,13 +2033,6 @@ impl ChannelManager { - return Err(ReceiveError { - err_code: 0x4000|0x2000|3, - err_data: Vec::new(), - msg: "We require payment_secrets", - }); - }, msgs::OnionHopDataFormat::NonFinalNode { .. } => { return Err(ReceiveError { err_code: 0x4000|22, @@ -2196,7 +2085,8 @@ impl ChannelManager ChannelManager short_channel_id, msgs::OnionHopDataFormat::NonFinalNode { short_channel_id } => short_channel_id, msgs::OnionHopDataFormat::FinalNode { .. } => { return_err!("Final Node OnionHopData provided for us as an intermediary node", 0x4000 | 22, &[0;0]); @@ -2292,13 +2181,14 @@ impl ChannelManager ChannelManager { // unknown_next_peer // Note that this is likely a timing oracle for detecting whether an scid is a // phantom. - if fake_scid::is_valid_phantom(&self.fake_scid_rand_bytes, *short_channel_id) { + if fake_scid::is_valid_phantom(&self.fake_scid_rand_bytes, *short_channel_id, &self.genesis_hash) { None } else { break Some(("Don't have available channel for forwarding as requested.", 0x4000 | 10, None)); @@ -2349,10 +2239,10 @@ impl ChannelManager ChannelManager Err(PaymentSendFailure::ParameterError(APIError::RouteError { - err: "Payment already in progress" - })), + hash_map::Entry::Occupied(_) => Err(PaymentSendFailure::DuplicatePayment), hash_map::Entry::Vacant(entry) => { let payment = entry.insert(PendingOutboundPayment::Retryable { session_privs: HashSet::new(), @@ -2753,7 +2641,7 @@ impl ChannelManager ChannelManager { @@ -3154,8 +3040,8 @@ impl ChannelManager { macro_rules! failure_handler { @@ -3199,7 +3085,7 @@ impl ChannelManager res, @@ -3217,7 +3103,7 @@ impl ChannelManager { - match self.construct_recv_pending_htlc_info(hop_data, incoming_shared_secret, payment_hash, amt_to_forward, outgoing_cltv_value, Some(phantom_shared_secret)) { + match self.construct_recv_pending_htlc_info(hop_data, incoming_shared_secret, payment_hash, outgoing_amt_msat, outgoing_cltv_value, Some(phantom_shared_secret)) { Ok(info) => phantom_receives.push((prev_short_channel_id, prev_funding_outpoint, vec![(info, prev_htlc_id)])), Err(ReceiveError { err_code, err_data, msg }) => failed_payment!(msg, err_code, err_data, Some(phantom_shared_secret)) } @@ -3248,6 +3134,8 @@ impl ChannelManager { forwarding_channel_not_found!(); @@ -3261,8 +3149,8 @@ impl ChannelManager { log_trace!(self.logger, "Adding HTLC from short id {} with payment_hash {} to channel with short id {} after delay", prev_short_channel_id, log_bytes!(payment_hash.0), short_chan_id); @@ -3274,7 +3162,7 @@ impl ChannelManager { if let ChannelError::Ignore(msg) = e { log_trace!(self.logger, "Failed to forward HTLC with payment_hash {}: {}", log_bytes!(payment_hash.0), msg); @@ -3388,7 +3276,7 @@ impl ChannelManager { let (cltv_expiry, onion_payload, payment_data, phantom_shared_secret) = match routing { @@ -3410,9 +3298,9 @@ impl ChannelManager ChannelManager ChannelManager { - match channel_state.claimable_htlcs.entry(payment_hash) { + match self.claimable_htlcs.lock().unwrap().entry(payment_hash) { hash_map::Entry::Vacant(e) => { let purpose = events::PaymentPurpose::SpontaneousPayment(preimage); e.insert((purpose.clone(), vec![claimable_htlc])); new_events.push(events::Event::PaymentReceived { payment_hash, - amount_msat: amt_to_forward, + amount_msat: outgoing_amt_msat, purpose, }); }, @@ -3802,29 +3691,29 @@ impl ChannelManager= MPP_TIMEOUT_TICKS + }) { + timed_out_mpp_htlcs.extend(htlcs.into_iter().map(|htlc| (htlc.prev_hop.clone(), payment_hash.clone()))); return false; } - if let OnionPayload::Invoice { .. } = htlcs[0].onion_payload { - // Check if we've received all the parts we need for an MPP (the value of the parts adds to total_msat). - // In this case we're not going to handle any timeouts of the parts here. - if htlcs[0].total_msat == htlcs.iter().fold(0, |total, htlc| total + htlc.value) { - return true; - } else if htlcs.into_iter().any(|htlc| { - htlc.timer_ticks += 1; - return htlc.timer_ticks >= MPP_TIMEOUT_TICKS - }) { - timed_out_mpp_htlcs.extend(htlcs.into_iter().map(|htlc| (htlc.prev_hop.clone(), payment_hash.clone()))); - return false; - } - } - true - }); - } + } + true + }); for htlc_source in timed_out_mpp_htlcs.drain(..) { let receiver = HTLCDestination::FailedPayment { payment_hash: htlc_source.1 }; @@ -3857,10 +3746,7 @@ impl ChannelManager ChannelManager ChannelManager, + channel: &mut Channel<::Signer>, raa: Option, + commitment_update: Option, order: RAACommitmentOrder, + pending_forwards: Vec<(PendingHTLCInfo, u64)>, funding_broadcastable: Option, + channel_ready: Option, announcement_sigs: Option) + -> Option<(u64, OutPoint, Vec<(PendingHTLCInfo, u64)>)> { + let mut htlc_forwards = None; + + let counterparty_node_id = channel.get_counterparty_node_id(); + if !pending_forwards.is_empty() { + htlc_forwards = Some((channel.get_short_channel_id().unwrap_or(channel.outbound_scid_alias()), + channel.get_funding_txo().unwrap(), pending_forwards)); + } + + if let Some(msg) = channel_ready { + send_channel_ready!(self, pending_msg_events, channel, msg); + } + if let Some(msg) = announcement_sigs { + pending_msg_events.push(events::MessageSendEvent::SendAnnouncementSignatures { + node_id: counterparty_node_id, + msg, + }); + } + + emit_channel_ready_event!(self, channel); + + macro_rules! handle_cs { () => { + if let Some(update) = commitment_update { + pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { + node_id: counterparty_node_id, + updates: update, + }); + } + } } + macro_rules! handle_raa { () => { + if let Some(revoke_and_ack) = raa { + pending_msg_events.push(events::MessageSendEvent::SendRevokeAndACK { + node_id: counterparty_node_id, + msg: revoke_and_ack, + }); + } + } } + match order { + RAACommitmentOrder::CommitmentFirst => { + handle_cs!(); + handle_raa!(); + }, + RAACommitmentOrder::RevokeAndACKFirst => { + handle_raa!(); + handle_cs!(); + }, + } + + if let Some(tx) = funding_broadcastable { + log_info!(self.logger, "Broadcasting funding transaction with txid {}", tx.txid()); + self.tx_broadcaster.broadcast_transaction(&tx); + } + + htlc_forwards + } + fn channel_monitor_updated(&self, funding_txo: &OutPoint, highest_applied_update_id: u64) { let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); - let chan_restoration_res; + let htlc_forwards; let (mut pending_failures, finalized_claims, counterparty_node_id) = { let mut channel_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_lock; @@ -4510,14 +4459,16 @@ impl ChannelManager ChannelManager Result<(), APIError> { + pub fn accept_inbound_channel(&self, temporary_channel_id: &[u8; 32], counterparty_node_id: &PublicKey, user_channel_id: u128) -> Result<(), APIError> { self.do_accept_inbound_channel(temporary_channel_id, counterparty_node_id, false, user_channel_id) } @@ -4563,11 +4514,11 @@ impl ChannelManager Result<(), APIError> { + pub fn accept_inbound_channel_from_trusted_peer_0conf(&self, temporary_channel_id: &[u8; 32], counterparty_node_id: &PublicKey, user_channel_id: u128) -> Result<(), APIError> { self.do_accept_inbound_channel(temporary_channel_id, counterparty_node_id, true, user_channel_id) } - fn do_accept_inbound_channel(&self, temporary_channel_id: &[u8; 32], counterparty_node_id: &PublicKey, accept_0conf: bool, user_channel_id: u64) -> Result<(), APIError> { + fn do_accept_inbound_channel(&self, temporary_channel_id: &[u8; 32], counterparty_node_id: &PublicKey, accept_0conf: bool, user_channel_id: u128) -> Result<(), APIError> { let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); let mut channel_state_lock = self.channel_state.lock().unwrap(); @@ -4615,9 +4566,13 @@ impl ChannelManager { @@ -4640,7 +4595,7 @@ impl ChannelManager ChannelManager return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id)) } @@ -5264,8 +5219,8 @@ impl ChannelManager Result<(), MsgHandleErrInternal> { - let chan_restoration_res; - let (htlcs_failed_forward, need_lnd_workaround) = { + let htlc_forwards; + let need_lnd_workaround = { let mut channel_state_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_state_lock; @@ -5299,19 +5254,21 @@ impl ChannelManager return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id)) } }; - post_handle_chan_restoration!(self, chan_restoration_res); - self.fail_holding_cell_htlcs(htlcs_failed_forward, msg.channel_id, counterparty_node_id); + + if let Some(forwards) = htlc_forwards { + self.forward_htlcs(&mut [forwards][..]); + } if let Some(channel_ready_msg) = need_lnd_workaround { self.internal_channel_ready(counterparty_node_id, &channel_ready_msg)?; @@ -5708,10 +5665,26 @@ impl ChannelManager InFlightHtlcs { + let mut inflight_htlcs = InFlightHtlcs::new(); + + for chan in self.channel_state.lock().unwrap().by_id.values() { + for htlc_source in chan.inflight_htlc_sources() { + if let HTLCSource::OutboundRoute { path, .. } = htlc_source { + inflight_htlcs.process_path(path, self.get_our_node_id()); + } + } + } + + inflight_htlcs + } + #[cfg(any(test, fuzzing, feature = "_test_utils"))] pub fn get_and_clear_pending_events(&self) -> Vec { let events = core::cell::RefCell::new(Vec::new()); - let event_handler = |event: &events::Event| events.borrow_mut().push(event.clone()); + let event_handler = |event: events::Event| events.borrow_mut().push(event); self.process_pending_events(&event_handler); events.into_inner() } @@ -5725,6 +5698,39 @@ impl ChannelManager Future>( + &self, handler: H + ) { + // We'll acquire our total consistency lock until the returned future completes so that + // we can be sure no other persists happen while processing events. + let _read_guard = self.total_consistency_lock.read().unwrap(); + + let mut result = NotifyOption::SkipPersist; + + // TODO: This behavior should be documented. It's unintuitive that we query + // ChannelMonitors when clearing other events. + if self.process_pending_monitor_events() { + result = NotifyOption::DoPersist; + } + + let pending_events = mem::replace(&mut *self.pending_events.lock().unwrap(), vec![]); + if !pending_events.is_empty() { + result = NotifyOption::DoPersist; + } + + for event in pending_events { + handler(event).await; + } + + if result == NotifyOption::DoPersist { + self.persistence_notifier.notify(); + } + } } impl MessageSendEventsProvider for ChannelManager @@ -5788,13 +5794,13 @@ where result = NotifyOption::DoPersist; } - let mut pending_events = mem::replace(&mut *self.pending_events.lock().unwrap(), vec![]); + let pending_events = mem::replace(&mut *self.pending_events.lock().unwrap(), vec![]); if !pending_events.is_empty() { result = NotifyOption::DoPersist; } - for event in pending_events.drain(..) { - handler.handle_event(&event); + for event in pending_events { + handler.handle_event(event); } result @@ -5903,12 +5909,12 @@ where }); } - fn get_relevant_txids(&self) -> Vec { + fn get_relevant_txids(&self) -> Vec<(Txid, Option)> { let channel_state = self.channel_state.lock().unwrap(); let mut res = Vec::with_capacity(channel_state.by_id.len()); for chan in channel_state.by_id.values() { - if let Some(funding_txo) = chan.get_funding_txo() { - res.push(funding_txo.txid); + if let (Some(funding_txo), block_hash) = (chan.get_funding_txo(), chan.get_funding_tx_confirmed_in()) { + res.push((funding_txo.txid, block_hash)); } } res @@ -6030,28 +6036,28 @@ where } true }); + } - if let Some(height) = height_opt { - channel_state.claimable_htlcs.retain(|payment_hash, (_, htlcs)| { - htlcs.retain(|htlc| { - // If height is approaching the number of blocks we think it takes us to get - // our commitment transaction confirmed before the HTLC expires, plus the - // number of blocks we generally consider it to take to do a commitment update, - // just give up on it and fail the HTLC. - if height >= htlc.cltv_expiry - HTLC_FAIL_BACK_BUFFER { - let mut htlc_msat_height_data = byte_utils::be64_to_array(htlc.value).to_vec(); - htlc_msat_height_data.extend_from_slice(&byte_utils::be32_to_array(height)); - - timed_out_htlcs.push((HTLCSource::PreviousHopData(htlc.prev_hop.clone()), payment_hash.clone(), HTLCFailReason::Reason { - failure_code: 0x4000 | 15, - data: htlc_msat_height_data - }, HTLCDestination::FailedPayment { payment_hash: payment_hash.clone() })); - false - } else { true } - }); - !htlcs.is_empty() // Only retain this entry if htlcs has at least one entry. + if let Some(height) = height_opt { + self.claimable_htlcs.lock().unwrap().retain(|payment_hash, (_, htlcs)| { + htlcs.retain(|htlc| { + // If height is approaching the number of blocks we think it takes us to get + // our commitment transaction confirmed before the HTLC expires, plus the + // number of blocks we generally consider it to take to do a commitment update, + // just give up on it and fail the HTLC. + if height >= htlc.cltv_expiry - HTLC_FAIL_BACK_BUFFER { + let mut htlc_msat_height_data = byte_utils::be64_to_array(htlc.value).to_vec(); + htlc_msat_height_data.extend_from_slice(&byte_utils::be32_to_array(height)); + + timed_out_htlcs.push((HTLCSource::PreviousHopData(htlc.prev_hop.clone()), payment_hash.clone(), HTLCFailReason::Reason { + failure_code: 0x4000 | 15, + data: htlc_msat_height_data + }, HTLCDestination::FailedPayment { payment_hash: payment_hash.clone() })); + false + } else { true } }); - } + !htlcs.is_empty() // Only retain this entry if htlcs has at least one entry. + }); } self.handle_init_event_channel_failures(failed_channels); @@ -6063,18 +6069,25 @@ where /// Blocks until ChannelManager needs to be persisted or a timeout is reached. It returns a bool /// indicating whether persistence is necessary. Only one listener on - /// `await_persistable_update` or `await_persistable_update_timeout` is guaranteed to be woken - /// up. + /// [`await_persistable_update`], [`await_persistable_update_timeout`], or a future returned by + /// [`get_persistable_update_future`] is guaranteed to be woken up. /// /// Note that this method is not available with the `no-std` feature. + /// + /// [`await_persistable_update`]: Self::await_persistable_update + /// [`await_persistable_update_timeout`]: Self::await_persistable_update_timeout + /// [`get_persistable_update_future`]: Self::get_persistable_update_future #[cfg(any(test, feature = "std"))] pub fn await_persistable_update_timeout(&self, max_wait: Duration) -> bool { self.persistence_notifier.wait_timeout(max_wait) } /// Blocks until ChannelManager needs to be persisted. Only one listener on - /// `await_persistable_update` or `await_persistable_update_timeout` is guaranteed to be woken - /// up. + /// [`await_persistable_update`], `await_persistable_update_timeout`, or a future returned by + /// [`get_persistable_update_future`] is guaranteed to be woken up. + /// + /// [`await_persistable_update`]: Self::await_persistable_update + /// [`get_persistable_update_future`]: Self::get_persistable_update_future pub fn await_persistable_update(&self) { self.persistence_notifier.wait() } @@ -6415,33 +6428,108 @@ impl_writeable_tlv_based!(ChannelCounterparty, { (11, outbound_htlc_maximum_msat, option), }); -impl_writeable_tlv_based!(ChannelDetails, { - (1, inbound_scid_alias, option), - (2, channel_id, required), - (3, channel_type, option), - (4, counterparty, required), - (5, outbound_scid_alias, option), - (6, funding_txo, option), - (7, config, option), - (8, short_channel_id, option), - (10, channel_value_satoshis, required), - (12, unspendable_punishment_reserve, option), - (14, user_channel_id, required), - (16, balance_msat, required), - (18, outbound_capacity_msat, required), - // Note that by the time we get past the required read above, outbound_capacity_msat will be - // filled in, so we can safely unwrap it here. - (19, next_outbound_htlc_limit_msat, (default_value, outbound_capacity_msat.0.unwrap() as u64)), - (20, inbound_capacity_msat, required), - (22, confirmations_required, option), - (24, force_close_spend_delay, option), - (26, is_outbound, required), - (28, is_channel_ready, required), - (30, is_usable, required), - (32, is_public, required), - (33, inbound_htlc_minimum_msat, option), - (35, inbound_htlc_maximum_msat, option), -}); +impl Writeable for ChannelDetails { + fn write(&self, writer: &mut W) -> Result<(), io::Error> { + // `user_channel_id` used to be a single u64 value. In order to remain backwards compatible with + // versions prior to 0.0.113, the u128 is serialized as two separate u64 values. + let user_channel_id_low = self.user_channel_id as u64; + let user_channel_id_high_opt = Some((self.user_channel_id >> 64) as u64); + write_tlv_fields!(writer, { + (1, self.inbound_scid_alias, option), + (2, self.channel_id, required), + (3, self.channel_type, option), + (4, self.counterparty, required), + (5, self.outbound_scid_alias, option), + (6, self.funding_txo, option), + (7, self.config, option), + (8, self.short_channel_id, option), + (10, self.channel_value_satoshis, required), + (12, self.unspendable_punishment_reserve, option), + (14, user_channel_id_low, required), + (16, self.balance_msat, required), + (18, self.outbound_capacity_msat, required), + // Note that by the time we get past the required read above, outbound_capacity_msat will be + // filled in, so we can safely unwrap it here. + (19, self.next_outbound_htlc_limit_msat, (default_value, outbound_capacity_msat.0.unwrap() as u64)), + (20, self.inbound_capacity_msat, required), + (22, self.confirmations_required, option), + (24, self.force_close_spend_delay, option), + (26, self.is_outbound, required), + (28, self.is_channel_ready, required), + (30, self.is_usable, required), + (32, self.is_public, required), + (33, self.inbound_htlc_minimum_msat, option), + (35, self.inbound_htlc_maximum_msat, option), + (37, user_channel_id_high_opt, option), + }); + Ok(()) + } +} + +impl Readable for ChannelDetails { + fn read(reader: &mut R) -> Result { + init_and_read_tlv_fields!(reader, { + (1, inbound_scid_alias, option), + (2, channel_id, required), + (3, channel_type, option), + (4, counterparty, required), + (5, outbound_scid_alias, option), + (6, funding_txo, option), + (7, config, option), + (8, short_channel_id, option), + (10, channel_value_satoshis, required), + (12, unspendable_punishment_reserve, option), + (14, user_channel_id_low, required), + (16, balance_msat, required), + (18, outbound_capacity_msat, required), + // Note that by the time we get past the required read above, outbound_capacity_msat will be + // filled in, so we can safely unwrap it here. + (19, next_outbound_htlc_limit_msat, (default_value, outbound_capacity_msat.0.unwrap() as u64)), + (20, inbound_capacity_msat, required), + (22, confirmations_required, option), + (24, force_close_spend_delay, option), + (26, is_outbound, required), + (28, is_channel_ready, required), + (30, is_usable, required), + (32, is_public, required), + (33, inbound_htlc_minimum_msat, option), + (35, inbound_htlc_maximum_msat, option), + (37, user_channel_id_high_opt, option), + }); + + // `user_channel_id` used to be a single u64 value. In order to remain backwards compatible with + // versions prior to 0.0.113, the u128 is serialized as two separate u64 values. + let user_channel_id_low: u64 = user_channel_id_low.0.unwrap(); + let user_channel_id = user_channel_id_low as u128 + + ((user_channel_id_high_opt.unwrap_or(0 as u64) as u128) << 64); + + Ok(Self { + inbound_scid_alias, + channel_id: channel_id.0.unwrap(), + channel_type, + counterparty: counterparty.0.unwrap(), + outbound_scid_alias, + funding_txo, + config, + short_channel_id, + channel_value_satoshis: channel_value_satoshis.0.unwrap(), + unspendable_punishment_reserve, + user_channel_id, + balance_msat: balance_msat.0.unwrap(), + outbound_capacity_msat: outbound_capacity_msat.0.unwrap(), + next_outbound_htlc_limit_msat: next_outbound_htlc_limit_msat.0.unwrap(), + inbound_capacity_msat: inbound_capacity_msat.0.unwrap(), + confirmations_required, + force_close_spend_delay, + is_outbound: is_outbound.0.unwrap(), + is_channel_ready: is_channel_ready.0.unwrap(), + is_usable: is_usable.0.unwrap(), + is_public: is_public.0.unwrap(), + inbound_htlc_minimum_msat, + inbound_htlc_maximum_msat, + }) + } +} impl_writeable_tlv_based!(PhantomRouteHints, { (2, channels, vec_type), @@ -6469,8 +6557,9 @@ impl_writeable_tlv_based!(PendingHTLCInfo, { (0, routing, required), (2, incoming_shared_secret, required), (4, payment_hash, required), - (6, amt_to_forward, required), - (8, outgoing_cltv_value, required) + (6, outgoing_amt_msat, required), + (8, outgoing_cltv_value, required), + (9, incoming_amt_msat, option), }); @@ -6669,7 +6758,7 @@ impl Writeable for HTLCSource { (1, payment_id_opt, option), (2, first_hop_htlc_msat, required), (3, payment_secret, option), - (4, path, vec_type), + (4, *path, vec_type), (5, payment_params, option), }); } @@ -6788,10 +6877,13 @@ impl Writeable for ChannelMana } } - let channel_state = self.channel_state.lock().unwrap(); + let pending_inbound_payments = self.pending_inbound_payments.lock().unwrap(); + let claimable_htlcs = self.claimable_htlcs.lock().unwrap(); + let pending_outbound_payments = self.pending_outbound_payments.lock().unwrap(); + let mut htlc_purposes: Vec<&events::PaymentPurpose> = Vec::new(); - (channel_state.claimable_htlcs.len() as u64).write(writer)?; - for (payment_hash, (purpose, previous_hops)) in channel_state.claimable_htlcs.iter() { + (claimable_htlcs.len() as u64).write(writer)?; + for (payment_hash, (purpose, previous_hops)) in claimable_htlcs.iter() { payment_hash.write(writer)?; (previous_hops.len() as u64).write(writer)?; for htlc in previous_hops.iter() { @@ -6808,8 +6900,6 @@ impl Writeable for ChannelMana peer_state.latest_features.write(writer)?; } - let pending_inbound_payments = self.pending_inbound_payments.lock().unwrap(); - let pending_outbound_payments = self.pending_outbound_payments.lock().unwrap(); let events = self.pending_events.lock().unwrap(); (events.len() as u64).write(writer)?; for event in events.iter() { @@ -6917,10 +7007,10 @@ impl Writeable for ChannelMana /// which you've already broadcasted the transaction. /// /// [`ChainMonitor`]: crate::chain::chainmonitor::ChainMonitor -pub struct ChannelManagerReadArgs<'a, Signer: 'a + Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> - where M::Target: chain::Watch, +pub struct ChannelManagerReadArgs<'a, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> + where M::Target: chain::Watch<::Signer>, T::Target: BroadcasterInterface, - K::Target: KeysInterface, + K::Target: KeysInterface, F::Target: FeeEstimator, L::Target: Logger, { @@ -6963,14 +7053,14 @@ pub struct ChannelManagerReadArgs<'a, Signer: 'a + Sign, M: Deref, T: Deref, K: /// this struct. /// /// (C-not exported) because we have no HashMap bindings - pub channel_monitors: HashMap>, + pub channel_monitors: HashMap::Signer>>, } -impl<'a, Signer: 'a + Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> - ChannelManagerReadArgs<'a, Signer, M, T, K, F, L> - where M::Target: chain::Watch, +impl<'a, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> + ChannelManagerReadArgs<'a, M, T, K, F, L> + where M::Target: chain::Watch<::Signer>, T::Target: BroadcasterInterface, - K::Target: KeysInterface, + K::Target: KeysInterface, F::Target: FeeEstimator, L::Target: Logger, { @@ -6978,7 +7068,7 @@ impl<'a, Signer: 'a + Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> /// HashMap for you. This is primarily useful for C bindings where it is not practical to /// populate a HashMap directly from C. pub fn new(keys_manager: K, fee_estimator: F, chain_monitor: M, tx_broadcaster: T, logger: L, default_config: UserConfig, - mut channel_monitors: Vec<&'a mut ChannelMonitor>) -> Self { + mut channel_monitors: Vec<&'a mut ChannelMonitor<::Signer>>) -> Self { Self { keys_manager, fee_estimator, chain_monitor, tx_broadcaster, logger, default_config, channel_monitors: channel_monitors.drain(..).map(|monitor| { (monitor.get_funding_txo().0, monitor) }).collect() @@ -6989,28 +7079,28 @@ impl<'a, Signer: 'a + Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> // Implement ReadableArgs for an Arc'd ChannelManager to make it a bit easier to work with the // SipmleArcChannelManager type: impl<'a, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> - ReadableArgs::Signer, M, T, K, F, L>> for (BlockHash, Arc>) + ReadableArgs> for (BlockHash, Arc>) where M::Target: chain::Watch<::Signer>, T::Target: BroadcasterInterface, K::Target: KeysInterface, F::Target: FeeEstimator, L::Target: Logger, { - fn read(reader: &mut R, args: ChannelManagerReadArgs<'a, ::Signer, M, T, K, F, L>) -> Result { + fn read(reader: &mut R, args: ChannelManagerReadArgs<'a, M, T, K, F, L>) -> Result { let (blockhash, chan_manager) = <(BlockHash, ChannelManager)>::read(reader, args)?; Ok((blockhash, Arc::new(chan_manager))) } } impl<'a, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> - ReadableArgs::Signer, M, T, K, F, L>> for (BlockHash, ChannelManager) + ReadableArgs> for (BlockHash, ChannelManager) where M::Target: chain::Watch<::Signer>, T::Target: BroadcasterInterface, K::Target: KeysInterface, F::Target: FeeEstimator, L::Target: Logger, { - fn read(reader: &mut R, mut args: ChannelManagerReadArgs<'a, ::Signer, M, T, K, F, L>) -> Result { + fn read(reader: &mut R, mut args: ChannelManagerReadArgs<'a, M, T, K, F, L>) -> Result { let _ver = read_ver_prefix!(reader, SERIALIZATION_VERSION); let genesis_hash: BlockHash = Readable::read(reader)?; @@ -7402,7 +7492,6 @@ impl<'a, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> channel_state: Mutex::new(ChannelHolder { by_id, - claimable_htlcs, pending_msg_events: Vec::new(), }), inbound_payment_key: expanded_inbound_key, @@ -7410,6 +7499,7 @@ impl<'a, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> pending_outbound_payments: Mutex::new(pending_outbound_payments.unwrap()), forward_htlcs: Mutex::new(forward_htlcs), + claimable_htlcs: Mutex::new(claimable_htlcs), outbound_scid_aliases: Mutex::new(outbound_scid_aliases), id_to_peer: Mutex::new(id_to_peer), short_to_chan_info: FairRwLock::new(short_to_chan_info),