X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fln%2Fchannelmanager.rs;h=bcde81dd922b59bde534f0a1506b08924687ffb6;hb=71f98c13f8e5cab69f3084f0c8825132bc0111b7;hp=4f2fbad19e5edf06bd88827694d77e5ee6b8072e;hpb=b935b37ee5ce40d6f77c201e90b0b9a8dc353551;p=rust-lightning diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 4f2fbad1..bcde81dd 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -29,8 +29,8 @@ use chain::chaininterface::{BroadcasterInterface,ChainListener,FeeEstimator}; use chain::transaction::OutPoint; use ln::channel::{Channel, ChannelError}; use ln::channelmonitor::{ChannelMonitor, ChannelMonitorUpdateErr, ManyChannelMonitor, CLTV_CLAIM_BUFFER, LATENCY_GRACE_PERIOD_BLOCKS, ANTI_REORG_DELAY}; -use ln::router::Route; use ln::features::{InitFeatures, NodeFeatures}; +use ln::router::{Route, RouteHop}; use ln::msgs; use ln::onion_utils; use ln::msgs::{ChannelMessageHandler, DecodeError, LightningError}; @@ -76,6 +76,7 @@ enum PendingForwardReceiveHTLCInfo { }, Receive { payment_data: Option, + incoming_cltv_expiry: u32, // Used to track when we should expire pending HTLCs that go unclaimed }, } @@ -125,6 +126,7 @@ struct ClaimableHTLC { src: HTLCPreviousHopData, value: u64, payment_data: Option, + cltv_expiry: u32, } /// Tracks the inbound corresponding to an outbound HTLC @@ -132,7 +134,7 @@ struct ClaimableHTLC { pub(super) enum HTLCSource { PreviousHopData(HTLCPreviousHopData), OutboundRoute { - route: Route, + path: Vec, session_priv: SecretKey, /// Technically we can recalculate this from the route, but we cache it here to avoid /// doing a double-pass on route when we get a failure back @@ -143,7 +145,7 @@ pub(super) enum HTLCSource { impl HTLCSource { pub fn dummy() -> Self { HTLCSource::OutboundRoute { - route: Route { hops: Vec::new() }, + path: Vec::new(), session_priv: SecretKey::from_slice(&[1; 32]).unwrap(), first_hop_htlc_msat: 0, } @@ -284,11 +286,14 @@ pub(super) struct ChannelHolder { /// guarantees are made about the existence of a channel with the short id here, nor the short /// ids in the PendingHTLCInfo! pub(super) forward_htlcs: HashMap>, - /// Tracks things that were to us and can be failed/claimed by the user + /// (payment_hash, payment_secret) -> Vec for tracking things that + /// were to us and can be failed/claimed by the user /// Note that while this is held in the same mutex as the channels themselves, no consistency /// guarantees are made about the channels given here actually existing anymore by the time you /// go to read them! - claimable_htlcs: HashMap>, + /// TODO: We need to time out HTLCs sitting here which are waiting on other AMP HTLCs to + /// arrive. + claimable_htlcs: HashMap<(PaymentHash, Option<[u8; 32]>), Vec>, /// Messages to send to peers - pushed to in the same lock that they are generated in (except /// for broadcast messages, where ordering isn't as strict). pub(super) pending_msg_events: Vec, @@ -425,15 +430,6 @@ const CHECK_CLTV_EXPIRY_SANITY: u32 = CLTV_EXPIRY_DELTA as u32 - LATENCY_GRACE_P #[allow(dead_code)] const CHECK_CLTV_EXPIRY_SANITY_2: u32 = CLTV_EXPIRY_DELTA as u32 - LATENCY_GRACE_PERIOD_BLOCKS - 2*CLTV_CLAIM_BUFFER; -macro_rules! secp_call { - ( $res: expr, $err: expr ) => { - match $res { - Ok(key) => key, - Err(_) => return Err($err), - } - }; -} - /// Details of a channel, as returned by ChannelManager::list_channels and ChannelManager::list_usable_channels pub struct ChannelDetails { /// The channel's ID (prior to funding transaction generation, this is a random 32 bytes, @@ -470,22 +466,65 @@ pub struct ChannelDetails { pub is_live: bool, } +/// If a payment fails to send, it can be in one of several states. This enum is returned as the +/// Err() type describing which state the payment is in, see the description of individual enum +/// states for more. +#[derive(Debug)] +pub enum PaymentSendFailure { + /// A parameter which was passed to send_payment was invalid, preventing us from attempting to + /// send the payment at all. No channel state has been changed or messages sent to peers, and + /// once you've changed the parameter at error, you can freely retry the payment in full. + ParameterError(APIError), + /// All paths which were attempted failed to send, with no channel state change taking place. + /// You can freely retry the payment in full (though you probably want to do so over different + /// paths than the ones selected). + AllFailedRetrySafe(Vec), + /// Some paths which were attempted failed to send, though possibly not all. At least some + /// paths have irrevocably committed to the HTLC and retrying the payment in full would result + /// in over-/re-payment. + /// + /// The results here are ordered the same as the paths in the route object which was passed to + /// send_payment, and any Errs which are not APIError::MonitorUpdateFailed can be safely + /// retried. + /// + /// Any entries which contain Err(APIError::MonitorUpdateFailed) or Ok(()) MUST NOT be retried + /// as they will result in over-/re-payment. + PartialFailure(Vec>), +} + macro_rules! handle_error { - ($self: ident, $internal: expr, $their_node_id: expr, $locked_channel_state: expr) => { + ($self: ident, $internal: expr, $their_node_id: expr) => { match $internal { Ok(msg) => Ok(msg), Err(MsgHandleErrInternal { err, shutdown_finish }) => { + let mut channel_state = None; if let Some((shutdown_res, update_option)) = shutdown_finish { $self.finish_force_close_channel(shutdown_res); if let Some(update) = update_option { - $locked_channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { + channel_state = Some($self.channel_state.lock().unwrap()); + channel_state.as_mut().unwrap().pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { msg: update }); } } + #[cfg(debug_assertions)] + { + // In testing, we always lock here to ensure there are no deadlocks where we + // were holding the lock coming into the macro but didn't catch it because we + // didn't generate an action and didn't have any HTLCs to fail backwards in the + // finish_force_close_channel. + if channel_state.is_none() { + channel_state = Some($self.channel_state.lock().unwrap()); + } + } log_error!($self, "{}", err.err); if let msgs::ErrorAction::IgnoreError = err.action { - } else { $locked_channel_state.pending_msg_events.push(events::MessageSendEvent::HandleError { node_id: $their_node_id, action: err.action.clone() }); } + } else { + if channel_state.is_none() { + channel_state = Some($self.channel_state.lock().unwrap()); + } + channel_state.as_mut().unwrap().pending_msg_events.push(events::MessageSendEvent::HandleError { node_id: $their_node_id, action: err.action.clone() }); + } // Return error in case higher-API need one Err(err) }, @@ -653,6 +692,7 @@ impl ChannelManager where M::T let res = ChannelManager { default_configuration: config.clone(), genesis_hash: genesis_block(network).header.bitcoin_hash(), + //genesis_hash: Sha256dHash::from_hex("0f9188f13cb7b2c71f2a335e3a4fc328bf5beb436012afca590b1a11466e2206").unwrap(), fee_estimator: feeest.clone(), monitor, tx_broadcaster, @@ -993,7 +1033,10 @@ impl ChannelManager where M::T // delay) once they've send us a commitment_signed! PendingHTLCStatus::Forward(PendingHTLCInfo { - type_data: PendingForwardReceiveHTLCInfo::Receive { payment_data }, + type_data: PendingForwardReceiveHTLCInfo::Receive { + payment_data, + incoming_cltv_expiry: msg.cltv_expiry, + }, payment_hash: msg.payment_hash.clone(), incoming_shared_secret: shared_secret, amt_to_forward: next_hop_data.amt_to_forward, @@ -1164,99 +1207,154 @@ impl ChannelManager where M::T /// payment_preimage tracking (which you should already be doing as they represent "proof of /// payment") and prevent double-sends yourself. /// - /// May generate a SendHTLCs message event on success, which should be relayed. + /// May generate SendHTLCs message(s) event on success, which should be relayed. /// - /// Raises APIError::RoutError when invalid route or forward parameter - /// (cltv_delta, fee, node public key) is specified. - /// Raises APIError::ChannelUnavailable if the next-hop channel is not available for updates - /// (including due to previous monitor update failure or new permanent monitor update failure). - /// Raised APIError::MonitorUpdateFailed if a new monitor update failure prevented sending the - /// relevant updates. + /// Each path may have a different return value, and PaymentSendValue may return a Vec with + /// each entry matching the corresponding-index entry in the route paths. /// - /// In case of APIError::RouteError/APIError::ChannelUnavailable, the payment send has failed - /// and you may wish to retry via a different route immediately. - /// In case of APIError::MonitorUpdateFailed, the commitment update has been irrevocably - /// committed on our end and we're just waiting for a monitor update to send it. Do NOT retry - /// the payment via a different route unless you intend to pay twice! - pub fn send_payment(&self, route: Route, payment_hash: PaymentHash) -> Result<(), APIError> { - if route.hops.len() < 1 || route.hops.len() > 20 { - return Err(APIError::RouteError{err: "Route didn't go anywhere/had bogus size"}); - } + /// In general, a path may raise: + /// * APIError::RouteError when an invalid route or forwarding parameter (cltv_delta, fee, + /// node public key) is specified. + /// * APIError::ChannelUnavailable if the next-hop channel is not available for updates + /// (including due to previous monitor update failure or new permanent monitor update + /// failure). + /// * APIError::MonitorUpdateFailed if a new monitor update failure prevented sending the + /// relevant updates. + /// + /// Note that depending on the type of the PaymentSendFailure the HTLC may have been + /// irrevocably committed to on our end. In such a case, do NOT retry the payment with a + /// different route unless you intend to pay twice! + /// + /// payment_secret is unrelated to payment_hash (or PaymentPreimage) and exists to authenticate + /// the sender to the recipient and prevent payment-probing (deanonymization) attacks. For + /// newer nodes, it will be provided to you in the invoice. If you do not have one, the Route + /// must not contain multiple paths as otherwise the multipath data cannot be sent. + /// If a payment_secret *is* provided, we assume that the invoice had the basic_mpp feature bit + /// set (either as required or as available). + pub fn send_payment(&self, route: Route, payment_hash: PaymentHash, payment_secret: Option<&[u8; 32]>) -> Result<(), PaymentSendFailure> { + if route.paths.len() < 1 { + return Err(PaymentSendFailure::ParameterError(APIError::RouteError{err: "There must be at least one path to send over"})); + } + if route.paths.len() > 10 { + return Err(PaymentSendFailure::ParameterError(APIError::RouteError{err: "Sending over more than 10 paths is not currently supported"})); + } + let mut total_value = 0; let our_node_id = self.get_our_node_id(); - for (idx, hop) in route.hops.iter().enumerate() { - if idx != route.hops.len() - 1 && hop.pubkey == our_node_id { - return Err(APIError::RouteError{err: "Route went through us but wasn't a simple rebalance loop to us"}); + for path in route.paths.iter() { + if path.len() < 1 || path.len() > 20 { + return Err(PaymentSendFailure::ParameterError(APIError::RouteError{err: "Path didn't go anywhere/had bogus size"})); } + for (idx, hop) in path.iter().enumerate() { + if idx != path.len() - 1 && hop.pubkey == our_node_id { + return Err(PaymentSendFailure::ParameterError(APIError::RouteError{err: "Path went through us but wasn't a simple rebalance loop to us"})); + } + } + total_value += path.last().unwrap().fee_msat; } - - let (session_priv, prng_seed) = self.keys_manager.get_onion_rand(); - let cur_height = self.latest_block_height.load(Ordering::Acquire) as u32 + 1; + let mut results = Vec::new(); + 'path_loop: for path in route.paths.iter() { + macro_rules! check_res_push { + ($res: expr) => { match $res { + Ok(r) => r, + Err(e) => { + results.push(Err(e)); + continue 'path_loop; + }, + } + } + } - let onion_keys = secp_call!(onion_utils::construct_onion_keys(&self.secp_ctx, &route, &session_priv), - APIError::RouteError{err: "Pubkey along hop was maliciously selected"}); - let (onion_payloads, htlc_msat, htlc_cltv) = onion_utils::build_onion_payloads(&route, cur_height)?; - if onion_utils::route_size_insane(&onion_payloads) { - return Err(APIError::RouteError{err: "Route had too large size once"}); - } - let onion_packet = onion_utils::construct_onion_packet(onion_payloads, onion_keys, prng_seed, &payment_hash); + log_trace!(self, "Attempting to send payment for path with next hop {}", path.first().unwrap().short_channel_id); + let (session_priv, prng_seed) = self.keys_manager.get_onion_rand(); - let _ = self.total_consistency_lock.read().unwrap(); + let onion_keys = check_res_push!(onion_utils::construct_onion_keys(&self.secp_ctx, &path, &session_priv) + .map_err(|_| APIError::RouteError{err: "Pubkey along hop was maliciously selected"})); + let (onion_payloads, htlc_msat, htlc_cltv) = check_res_push!(onion_utils::build_onion_payloads(&path, total_value, payment_secret, cur_height)); + if onion_utils::route_size_insane(&onion_payloads) { + check_res_push!(Err(APIError::RouteError{err: "Route had too large size once"})); + } + let onion_packet = onion_utils::construct_onion_packet(onion_payloads, onion_keys, prng_seed, &payment_hash); - let mut channel_lock = self.channel_state.lock().unwrap(); - let err: Result<(), _> = loop { + let _ = self.total_consistency_lock.read().unwrap(); - let id = match channel_lock.short_to_id.get(&route.hops.first().unwrap().short_channel_id) { - None => return Err(APIError::ChannelUnavailable{err: "No channel available with first hop!"}), - Some(id) => id.clone(), - }; + let err: Result<(), _> = loop { + let mut channel_lock = self.channel_state.lock().unwrap(); + let id = match channel_lock.short_to_id.get(&path.first().unwrap().short_channel_id) { + None => check_res_push!(Err(APIError::ChannelUnavailable{err: "No channel available with first hop!"})), + Some(id) => id.clone(), + }; - let channel_state = &mut *channel_lock; - if let hash_map::Entry::Occupied(mut chan) = channel_state.by_id.entry(id) { - match { - if chan.get().get_their_node_id() != route.hops.first().unwrap().pubkey { - return Err(APIError::RouteError{err: "Node ID mismatch on first hop!"}); - } - if !chan.get().is_live() { - return Err(APIError::ChannelUnavailable{err: "Peer for first hop currently disconnected/pending monitor update!"}); - } - break_chan_entry!(self, chan.get_mut().send_htlc_and_commit(htlc_msat, payment_hash.clone(), htlc_cltv, HTLCSource::OutboundRoute { - route: route.clone(), - session_priv: session_priv.clone(), - first_hop_htlc_msat: htlc_msat, - }, onion_packet), channel_state, chan) - } { - Some((update_add, commitment_signed, chan_monitor)) => { - if let Err(e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) { - maybe_break_monitor_err!(self, e, channel_state, chan, RAACommitmentOrder::CommitmentFirst, false, true); - // Note that MonitorUpdateFailed here indicates (per function docs) - // that we will resent the commitment update once we unfree monitor - // updating, so we have to take special care that we don't return - // something else in case we will resend later! - return Err(APIError::MonitorUpdateFailed); + let channel_state = &mut *channel_lock; + if let hash_map::Entry::Occupied(mut chan) = channel_state.by_id.entry(id) { + match { + if chan.get().get_their_node_id() != path.first().unwrap().pubkey { + check_res_push!(Err(APIError::RouteError{err: "Node ID mismatch on first hop!"})); + } + if !chan.get().is_live() { + check_res_push!(Err(APIError::ChannelUnavailable{err: "Peer for first hop currently disconnected/pending monitor update!"})); } + break_chan_entry!(self, chan.get_mut().send_htlc_and_commit(htlc_msat, payment_hash.clone(), htlc_cltv, HTLCSource::OutboundRoute { + path: path.clone(), + session_priv: session_priv.clone(), + first_hop_htlc_msat: htlc_msat, + }, onion_packet), channel_state, chan) + } { + Some((update_add, commitment_signed, chan_monitor)) => { + if let Err(e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) { + maybe_break_monitor_err!(self, e, channel_state, chan, RAACommitmentOrder::CommitmentFirst, false, true); + // Note that MonitorUpdateFailed here indicates (per function docs) + // that we will resent the commitment update once we unfree monitor + // updating, so we have to take special care that we don't return + // something else in case we will resend later! + check_res_push!(Err(APIError::MonitorUpdateFailed)); + } - channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { - node_id: route.hops.first().unwrap().pubkey, - updates: msgs::CommitmentUpdate { - update_add_htlcs: vec![update_add], - update_fulfill_htlcs: Vec::new(), - update_fail_htlcs: Vec::new(), - update_fail_malformed_htlcs: Vec::new(), - update_fee: None, - commitment_signed, - }, - }); - }, - None => {}, - } - } else { unreachable!(); } - return Ok(()); - }; + channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { + node_id: path.first().unwrap().pubkey, + updates: msgs::CommitmentUpdate { + update_add_htlcs: vec![update_add], + update_fulfill_htlcs: Vec::new(), + update_fail_htlcs: Vec::new(), + update_fail_malformed_htlcs: Vec::new(), + update_fee: None, + commitment_signed, + }, + }); + }, + None => {}, + } + } else { unreachable!(); } + results.push(Ok(())); + continue 'path_loop; + }; - match handle_error!(self, err, route.hops.first().unwrap().pubkey, channel_lock) { - Ok(_) => unreachable!(), - Err(e) => { Err(APIError::ChannelUnavailable { err: e.err }) } + match handle_error!(self, err, path.first().unwrap().pubkey) { + Ok(_) => unreachable!(), + Err(e) => { + check_res_push!(Err(APIError::ChannelUnavailable { err: e.err })); + }, + } + } + let mut has_ok = false; + let mut has_err = false; + for res in results.iter() { + if res.is_ok() { has_ok = true; } + if res.is_err() { has_err = true; } + if let &Err(APIError::MonitorUpdateFailed) = res { + // MonitorUpdateFailed is inherently unsafe to retry, so we call it a + // PartialFailure. + has_err = true; + has_ok = true; + break; + } + } + if has_err && has_ok { + Err(PaymentSendFailure::PartialFailure(results)) + } else if has_err { + Err(PaymentSendFailure::AllFailedRetrySafe(results.drain(..).map(|r| r.unwrap_err()).collect())) + } else { + Ok(()) } } @@ -1273,8 +1371,7 @@ impl ChannelManager where M::T let _ = self.total_consistency_lock.read().unwrap(); let (mut chan, msg, chan_monitor) = { - let mut channel_state = self.channel_state.lock().unwrap(); - let (res, chan) = match channel_state.by_id.remove(temporary_channel_id) { + let (res, chan) = match self.channel_state.lock().unwrap().by_id.remove(temporary_channel_id) { Some(mut chan) => { (chan.get_outbound_funding_created(funding_txo) .map_err(|e| if let ChannelError::Close(msg) = e { @@ -1284,7 +1381,7 @@ impl ChannelManager where M::T }, None => return }; - match handle_error!(self, res, chan.get_their_node_id(), channel_state) { + match handle_error!(self, res, chan.get_their_node_id()) { Ok(funding_msg) => { (chan, funding_msg.0, funding_msg.1) }, @@ -1296,12 +1393,9 @@ impl ChannelManager where M::T if let Err(e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) { match e { ChannelMonitorUpdateErr::PermanentFailure => { - { - let mut channel_state = self.channel_state.lock().unwrap(); - match handle_error!(self, Err(MsgHandleErrInternal::from_finish_shutdown("ChannelMonitor storage failure", *temporary_channel_id, chan.force_shutdown(), None)), chan.get_their_node_id(), channel_state) { - Err(_) => { return; }, - Ok(()) => unreachable!(), - } + match handle_error!(self, Err(MsgHandleErrInternal::from_finish_shutdown("ChannelMonitor storage failure", *temporary_channel_id, chan.force_shutdown(), None)), chan.get_their_node_id()) { + Err(_) => { return; }, + Ok(()) => unreachable!(), } }, ChannelMonitorUpdateErr::TemporaryFailure => { @@ -1406,7 +1500,9 @@ impl ChannelManager where M::T htlc_id: prev_htlc_id, incoming_packet_shared_secret: forward_info.incoming_shared_secret, }); - failed_forwards.push((htlc_source, forward_info.payment_hash, 0x4000 | 10, None)); + failed_forwards.push((htlc_source, forward_info.payment_hash, + HTLCFailReason::Reason { failure_code: 0x1000 | 7, data: Vec::new() } + )); }, HTLCForwardInfo::FailHTLC { .. } => { // Channel went away before we could fail it. This implies @@ -1442,7 +1538,9 @@ impl ChannelManager where M::T panic!("Stated return value requirements in send_htlc() were not met"); } let chan_update = self.get_channel_update(chan.get()).unwrap(); - failed_forwards.push((htlc_source, payment_hash, 0x1000 | 7, Some(chan_update))); + failed_forwards.push((htlc_source, payment_hash, + HTLCFailReason::Reason { failure_code: 0x1000 | 7, data: chan_update.encode_with_len() } + )); continue; }, Ok(update_add) => { @@ -1515,10 +1613,8 @@ impl ChannelManager where M::T }, ChannelError::CloseDelayBroadcast { .. } => { panic!("Wait is only generated on receipt of channel_reestablish, which is handled by try_chan_entry, we don't bother to support it here"); } }; - match handle_error!(self, err, their_node_id, channel_state) { - Ok(_) => unreachable!(), - Err(_) => { continue; }, - } + handle_errors.push((their_node_id, err)); + continue; } }; if let Err(e) = self.monitor.add_update_monitor(monitor.get_funding_txo().unwrap(), monitor) { @@ -1544,22 +1640,56 @@ impl ChannelManager where M::T for forward_info in pending_forwards.drain(..) { match forward_info { HTLCForwardInfo::AddHTLC { prev_short_channel_id, prev_htlc_id, forward_info: PendingHTLCInfo { - type_data: PendingForwardReceiveHTLCInfo::Receive { payment_data }, + type_data: PendingForwardReceiveHTLCInfo::Receive { payment_data, incoming_cltv_expiry }, incoming_shared_secret, payment_hash, amt_to_forward, .. }, } => { let prev_hop_data = HTLCPreviousHopData { short_channel_id: prev_short_channel_id, htlc_id: prev_htlc_id, incoming_packet_shared_secret: incoming_shared_secret, }; - channel_state.claimable_htlcs.entry(payment_hash).or_insert(Vec::new()).push(ClaimableHTLC { + + let mut total_value = 0; + let htlcs = channel_state.claimable_htlcs.entry((payment_hash, if let &Some(ref data) = &payment_data { + Some(data.payment_secret.clone()) } else { None })) + .or_insert(Vec::new()); + htlcs.push(ClaimableHTLC { src: prev_hop_data, value: amt_to_forward, - payment_data, - }); - new_events.push(events::Event::PaymentReceived { - payment_hash: payment_hash, - amt: amt_to_forward, + payment_data: payment_data.clone(), + cltv_expiry: incoming_cltv_expiry, }); + if let &Some(ref data) = &payment_data { + for htlc in htlcs.iter() { + total_value += htlc.value; + if htlc.payment_data.as_ref().unwrap().total_msat != data.total_msat { + total_value = msgs::MAX_VALUE_MSAT; + } + if total_value >= msgs::MAX_VALUE_MSAT { break; } + } + if total_value >= msgs::MAX_VALUE_MSAT { + for htlc in htlcs.iter() { + failed_forwards.push((HTLCSource::PreviousHopData(HTLCPreviousHopData { + short_channel_id: htlc.src.short_channel_id, + htlc_id: htlc.src.htlc_id, + incoming_packet_shared_secret: htlc.src.incoming_packet_shared_secret, + }), payment_hash, + HTLCFailReason::Reason { failure_code: 0x4000 | 15, data: byte_utils::be64_to_array(htlc.value).to_vec() } + )); + } + } else if total_value >= data.total_msat { + new_events.push(events::Event::PaymentReceived { + payment_hash: payment_hash, + payment_secret: Some(data.payment_secret), + amt: total_value, + }); + } + } else { + new_events.push(events::Event::PaymentReceived { + payment_hash: payment_hash, + payment_secret: None, + amt: amt_to_forward, + }); + } }, HTLCForwardInfo::AddHTLC { .. } => { panic!("short_channel_id == 0 should imply any pending_forward entries are of type Receive"); @@ -1573,18 +1703,12 @@ impl ChannelManager where M::T } } - for (htlc_source, payment_hash, failure_code, update) in failed_forwards.drain(..) { - match update { - None => self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_source, &payment_hash, HTLCFailReason::Reason { failure_code, data: Vec::new() }), - Some(chan_update) => self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_source, &payment_hash, HTLCFailReason::Reason { failure_code, data: chan_update.encode_with_len() }), - }; + for (htlc_source, payment_hash, failure_reason) in failed_forwards.drain(..) { + self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_source, &payment_hash, failure_reason); } - if handle_errors.len() > 0 { - let mut channel_state_lock = self.channel_state.lock().unwrap(); - for (their_node_id, err) in handle_errors.drain(..) { - let _ = handle_error!(self, err, their_node_id, channel_state_lock); - } + for (their_node_id, err) in handle_errors.drain(..) { + let _ = handle_error!(self, err, their_node_id); } if new_events.is_empty() { return } @@ -1622,11 +1746,11 @@ impl ChannelManager where M::T /// along the path (including in our own channel on which we received it). /// Returns false if no payment was found to fail backwards, true if the process of failing the /// HTLC backwards has been started. - pub fn fail_htlc_backwards(&self, payment_hash: &PaymentHash) -> bool { + pub fn fail_htlc_backwards(&self, payment_hash: &PaymentHash, payment_secret: &Option<[u8; 32]>) -> bool { let _ = self.total_consistency_lock.read().unwrap(); let mut channel_state = Some(self.channel_state.lock().unwrap()); - let removed_source = channel_state.as_mut().unwrap().claimable_htlcs.remove(payment_hash); + let removed_source = channel_state.as_mut().unwrap().claimable_htlcs.remove(&(*payment_hash, *payment_secret)); if let Some(mut sources) = removed_source { for htlc in sources.drain(..) { if channel_state.is_none() { channel_state = Some(self.channel_state.lock().unwrap()); } @@ -1650,7 +1774,7 @@ impl ChannelManager where M::T //between the branches here. We should make this async and move it into the forward HTLCs //timer handling. match source { - HTLCSource::OutboundRoute { ref route, .. } => { + HTLCSource::OutboundRoute { ref path, .. } => { log_trace!(self, "Failing outbound payment HTLC with payment_hash {}", log_bytes!(payment_hash.0)); mem::drop(channel_state_lock); match &onion_error { @@ -1692,7 +1816,7 @@ impl ChannelManager where M::T self.pending_events.lock().unwrap().push( events::Event::PaymentFailed { payment_hash: payment_hash.clone(), - rejected_by_dest: route.hops.len() == 1, + rejected_by_dest: path.len() == 1, #[cfg(test)] error_code: Some(*failure_code), } @@ -1748,17 +1872,27 @@ impl ChannelManager where M::T /// motivated attackers. /// /// May panic if called except in response to a PaymentReceived event. - pub fn claim_funds(&self, payment_preimage: PaymentPreimage, expected_amount: u64) -> bool { + pub fn claim_funds(&self, payment_preimage: PaymentPreimage, payment_secret: &Option<[u8; 32]>, expected_amount: u64) -> bool { let payment_hash = PaymentHash(Sha256::hash(&payment_preimage.0).into_inner()); let _ = self.total_consistency_lock.read().unwrap(); let mut channel_state = Some(self.channel_state.lock().unwrap()); - let removed_source = channel_state.as_mut().unwrap().claimable_htlcs.remove(&payment_hash); + let removed_source = channel_state.as_mut().unwrap().claimable_htlcs.remove(&(payment_hash, *payment_secret)); if let Some(mut sources) = removed_source { + assert!(!sources.is_empty()); + let passes_value = if let &Some(ref data) = &sources[0].payment_data { + assert!(payment_secret.is_some()); + if data.total_msat == expected_amount { true } else { false } + } else { + assert!(payment_secret.is_none()); + false + }; + + let mut one_claimed = false; for htlc in sources.drain(..) { if channel_state.is_none() { channel_state = Some(self.channel_state.lock().unwrap()); } - if htlc.value < expected_amount || htlc.value > expected_amount * 2 { + if !passes_value && (htlc.value < expected_amount || htlc.value > expected_amount * 2) { let mut htlc_msat_data = byte_utils::be64_to_array(htlc.value).to_vec(); let mut height_data = byte_utils::be32_to_array(self.latest_block_height.load(Ordering::Acquire) as u32).to_vec(); htlc_msat_data.append(&mut height_data); @@ -1767,9 +1901,10 @@ impl ChannelManager where M::T HTLCFailReason::Reason { failure_code: 0x4000|15, data: htlc_msat_data }); } else { self.claim_funds_internal(channel_state.take().unwrap(), HTLCSource::PreviousHopData(htlc.src), payment_preimage); + one_claimed = true; } } - true + one_claimed } else { false } } fn claim_funds_internal(&self, mut channel_state_lock: MutexGuard>, source: HTLCSource, payment_preimage: PaymentPreimage) { @@ -1836,7 +1971,8 @@ impl ChannelManager where M::T return; }; - let _ = handle_error!(self, err, their_node_id, channel_state_lock); + mem::drop(channel_state_lock); + let _ = handle_error!(self, err, their_node_id); } /// Gets the node_id held by this ChannelManager @@ -2567,9 +2703,9 @@ impl ChannelManager where M::T #[doc(hidden)] pub fn update_fee(&self, channel_id: [u8;32], feerate_per_kw: u64) -> Result<(), APIError> { let _ = self.total_consistency_lock.read().unwrap(); - let mut channel_state_lock = self.channel_state.lock().unwrap(); let their_node_id; let err: Result<(), _> = loop { + let mut channel_state_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_state_lock; match channel_state.by_id.entry(channel_id) { @@ -2608,7 +2744,7 @@ impl ChannelManager where M::T return Ok(()) }; - match handle_error!(self, err, their_node_id, channel_state_lock) { + match handle_error!(self, err, their_node_id) { Ok(_) => unreachable!(), Err(e) => { Err(APIError::APIMisuseError { err: e.err })} } @@ -2671,26 +2807,33 @@ impl ChainListener for ChannelM log_trace!(self, "Block {} at height {} connected with {} txn matched", header_hash, height, txn_matched.len()); let _ = self.total_consistency_lock.read().unwrap(); let mut failed_channels = Vec::new(); + let mut timed_out_htlcs = Vec::new(); { let mut channel_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_lock; let short_to_id = &mut channel_state.short_to_id; let pending_msg_events = &mut channel_state.pending_msg_events; channel_state.by_id.retain(|_, channel| { - let chan_res = channel.block_connected(header, height, txn_matched, indexes_of_txn_matched); - if let Ok(Some(funding_locked)) = chan_res { - pending_msg_events.push(events::MessageSendEvent::SendFundingLocked { - node_id: channel.get_their_node_id(), - msg: funding_locked, - }); - if let Some(announcement_sigs) = self.get_announcement_sigs(channel) { - pending_msg_events.push(events::MessageSendEvent::SendAnnouncementSignatures { + let res = channel.block_connected(header, height, txn_matched, indexes_of_txn_matched); + if let Ok((chan_res, mut timed_out_pending_htlcs)) = res { + timed_out_htlcs.reserve(timed_out_pending_htlcs.len()); + for (htlc_src, payment_hash, value) in timed_out_pending_htlcs.drain(..) { + timed_out_htlcs.push((htlc_src, payment_hash, value)); + } + if let Some(funding_locked) = chan_res { + pending_msg_events.push(events::MessageSendEvent::SendFundingLocked { node_id: channel.get_their_node_id(), - msg: announcement_sigs, + msg: funding_locked, }); + if let Some(announcement_sigs) = self.get_announcement_sigs(channel) { + pending_msg_events.push(events::MessageSendEvent::SendAnnouncementSignatures { + node_id: channel.get_their_node_id(), + msg: announcement_sigs, + }); + } + short_to_id.insert(channel.get_short_channel_id().unwrap(), channel.channel_id()); } - short_to_id.insert(channel.get_short_channel_id().unwrap(), channel.channel_id()); - } else if let Err(e) = chan_res { + } else if let Err(e) = res { pending_msg_events.push(events::MessageSendEvent::HandleError { node_id: channel.get_their_node_id(), action: msgs::ErrorAction::SendErrorMessage { msg: e }, @@ -2737,10 +2880,29 @@ impl ChainListener for ChannelM } true }); + + channel_state.claimable_htlcs.retain(|&(ref payment_hash, _), htlcs| { + htlcs.retain(|htlc| { + if height >= htlc.cltv_expiry - CLTV_CLAIM_BUFFER - LATENCY_GRACE_PERIOD_BLOCKS { + timed_out_htlcs.push((HTLCSource::PreviousHopData(htlc.src.clone()), payment_hash.clone(), htlc.value)); + false + } else { true } + }); + !htlcs.is_empty() + }); } for failure in failed_channels.drain(..) { self.finish_force_close_channel(failure); } + + for (source, payment_hash, value) in timed_out_htlcs.drain(..) { + // Call it preimage_unknown as the issue, ultimately, is that the user failed to + // provide us a preimage within the cltv_expiry time window. + self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), source, &payment_hash, HTLCFailReason::Reason { + failure_code: 0x4000 | 15, + data: byte_utils::be64_to_array(value).to_vec() + }); + } self.latest_block_height.store(height as usize, Ordering::Release); *self.last_block_hash.try_lock().expect("block_(dis)connected must not be called in parallel") = header_hash; } @@ -2782,146 +2944,82 @@ impl ChainListener for ChannelM impl ChannelMessageHandler for ChannelManager where M::Target: ManyChannelMonitor { fn handle_open_channel(&self, their_node_id: &PublicKey, their_features: InitFeatures, msg: &msgs::OpenChannel) { let _ = self.total_consistency_lock.read().unwrap(); - let res = self.internal_open_channel(their_node_id, their_features, msg); - if res.is_err() { - let mut channel_state_lock = self.channel_state.lock().unwrap(); - let _ = handle_error!(self, res, *their_node_id, channel_state_lock); - } + let _ = handle_error!(self, self.internal_open_channel(their_node_id, their_features, msg), *their_node_id); } fn handle_accept_channel(&self, their_node_id: &PublicKey, their_features: InitFeatures, msg: &msgs::AcceptChannel) { let _ = self.total_consistency_lock.read().unwrap(); - let res = self.internal_accept_channel(their_node_id, their_features, msg); - if res.is_err() { - let mut channel_state_lock = self.channel_state.lock().unwrap(); - let _ = handle_error!(self, res, *their_node_id, channel_state_lock); - } + let _ = handle_error!(self, self.internal_accept_channel(their_node_id, their_features, msg), *their_node_id); } fn handle_funding_created(&self, their_node_id: &PublicKey, msg: &msgs::FundingCreated) { let _ = self.total_consistency_lock.read().unwrap(); - let res = self.internal_funding_created(their_node_id, msg); - if res.is_err() { - let mut channel_state_lock = self.channel_state.lock().unwrap(); - let _ = handle_error!(self, res, *their_node_id, channel_state_lock); - } + let _ = handle_error!(self, self.internal_funding_created(their_node_id, msg), *their_node_id); } fn handle_funding_signed(&self, their_node_id: &PublicKey, msg: &msgs::FundingSigned) { let _ = self.total_consistency_lock.read().unwrap(); - let res = self.internal_funding_signed(their_node_id, msg); - if res.is_err() { - let mut channel_state_lock = self.channel_state.lock().unwrap(); - let _ = handle_error!(self, res, *their_node_id, channel_state_lock); - } + let _ = handle_error!(self, self.internal_funding_signed(their_node_id, msg), *their_node_id); } fn handle_funding_locked(&self, their_node_id: &PublicKey, msg: &msgs::FundingLocked) { let _ = self.total_consistency_lock.read().unwrap(); - let res = self.internal_funding_locked(their_node_id, msg); - if res.is_err() { - let mut channel_state_lock = self.channel_state.lock().unwrap(); - let _ = handle_error!(self, res, *their_node_id, channel_state_lock); - } + let _ = handle_error!(self, self.internal_funding_locked(their_node_id, msg), *their_node_id); } fn handle_shutdown(&self, their_node_id: &PublicKey, msg: &msgs::Shutdown) { let _ = self.total_consistency_lock.read().unwrap(); - let res = self.internal_shutdown(their_node_id, msg); - if res.is_err() { - let mut channel_state_lock = self.channel_state.lock().unwrap(); - let _ = handle_error!(self, res, *their_node_id, channel_state_lock); - } + let _ = handle_error!(self, self.internal_shutdown(their_node_id, msg), *their_node_id); } fn handle_closing_signed(&self, their_node_id: &PublicKey, msg: &msgs::ClosingSigned) { let _ = self.total_consistency_lock.read().unwrap(); - let res = self.internal_closing_signed(their_node_id, msg); - if res.is_err() { - let mut channel_state_lock = self.channel_state.lock().unwrap(); - let _ = handle_error!(self, res, *their_node_id, channel_state_lock); - } + let _ = handle_error!(self, self.internal_closing_signed(their_node_id, msg), *their_node_id); } fn handle_update_add_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateAddHTLC) { let _ = self.total_consistency_lock.read().unwrap(); - let res = self.internal_update_add_htlc(their_node_id, msg); - if res.is_err() { - let mut channel_state_lock = self.channel_state.lock().unwrap(); - let _ = handle_error!(self, res, *their_node_id, channel_state_lock); - } + let _ = handle_error!(self, self.internal_update_add_htlc(their_node_id, msg), *their_node_id); } fn handle_update_fulfill_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFulfillHTLC) { let _ = self.total_consistency_lock.read().unwrap(); - let res = self.internal_update_fulfill_htlc(their_node_id, msg); - if res.is_err() { - let mut channel_state_lock = self.channel_state.lock().unwrap(); - let _ = handle_error!(self, res, *their_node_id, channel_state_lock); - } + let _ = handle_error!(self, self.internal_update_fulfill_htlc(their_node_id, msg), *their_node_id); } fn handle_update_fail_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFailHTLC) { let _ = self.total_consistency_lock.read().unwrap(); - let res = self.internal_update_fail_htlc(their_node_id, msg); - if res.is_err() { - let mut channel_state_lock = self.channel_state.lock().unwrap(); - let _ = handle_error!(self, res, *their_node_id, channel_state_lock); - } + let _ = handle_error!(self, self.internal_update_fail_htlc(their_node_id, msg), *their_node_id); } fn handle_update_fail_malformed_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFailMalformedHTLC) { let _ = self.total_consistency_lock.read().unwrap(); - let res = self.internal_update_fail_malformed_htlc(their_node_id, msg); - if res.is_err() { - let mut channel_state_lock = self.channel_state.lock().unwrap(); - let _ = handle_error!(self, res, *their_node_id, channel_state_lock); - } + let _ = handle_error!(self, self.internal_update_fail_malformed_htlc(their_node_id, msg), *their_node_id); } fn handle_commitment_signed(&self, their_node_id: &PublicKey, msg: &msgs::CommitmentSigned) { let _ = self.total_consistency_lock.read().unwrap(); - let res = self.internal_commitment_signed(their_node_id, msg); - if res.is_err() { - let mut channel_state_lock = self.channel_state.lock().unwrap(); - let _ = handle_error!(self, res, *their_node_id, channel_state_lock); - } + let _ = handle_error!(self, self.internal_commitment_signed(their_node_id, msg), *their_node_id); } fn handle_revoke_and_ack(&self, their_node_id: &PublicKey, msg: &msgs::RevokeAndACK) { let _ = self.total_consistency_lock.read().unwrap(); - let res = self.internal_revoke_and_ack(their_node_id, msg); - if res.is_err() { - let mut channel_state_lock = self.channel_state.lock().unwrap(); - let _ = handle_error!(self, res, *their_node_id, channel_state_lock); - } + let _ = handle_error!(self, self.internal_revoke_and_ack(their_node_id, msg), *their_node_id); } fn handle_update_fee(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFee) { let _ = self.total_consistency_lock.read().unwrap(); - let res = self.internal_update_fee(their_node_id, msg); - if res.is_err() { - let mut channel_state_lock = self.channel_state.lock().unwrap(); - let _ = handle_error!(self, res, *their_node_id, channel_state_lock); - } + let _ = handle_error!(self, self.internal_update_fee(their_node_id, msg), *their_node_id); } fn handle_announcement_signatures(&self, their_node_id: &PublicKey, msg: &msgs::AnnouncementSignatures) { let _ = self.total_consistency_lock.read().unwrap(); - let res = self.internal_announcement_signatures(their_node_id, msg); - if res.is_err() { - let mut channel_state_lock = self.channel_state.lock().unwrap(); - let _ = handle_error!(self, res, *their_node_id, channel_state_lock); - } + let _ = handle_error!(self, self.internal_announcement_signatures(their_node_id, msg), *their_node_id); } fn handle_channel_reestablish(&self, their_node_id: &PublicKey, msg: &msgs::ChannelReestablish) { let _ = self.total_consistency_lock.read().unwrap(); - let res = self.internal_channel_reestablish(their_node_id, msg); - if res.is_err() { - let mut channel_state_lock = self.channel_state.lock().unwrap(); - let _ = handle_error!(self, res, *their_node_id, channel_state_lock); - } + let _ = handle_error!(self, self.internal_channel_reestablish(their_node_id, msg), *their_node_id); } fn peer_disconnected(&self, their_node_id: &PublicKey, no_connection_possible: bool) { @@ -3077,9 +3175,10 @@ impl Writeable for PendingHTLCInfo { onion_packet.write(writer)?; short_channel_id.write(writer)?; }, - &PendingForwardReceiveHTLCInfo::Receive { ref payment_data } => { + &PendingForwardReceiveHTLCInfo::Receive { ref payment_data, ref incoming_cltv_expiry } => { 1u8.write(writer)?; payment_data.write(writer)?; + incoming_cltv_expiry.write(writer)?; }, } self.incoming_shared_secret.write(writer)?; @@ -3100,6 +3199,7 @@ impl Readable for PendingHTLCInfo { }, 1u8 => PendingForwardReceiveHTLCInfo::Receive { payment_data: Readable::read(reader)?, + incoming_cltv_expiry: Readable::read(reader)?, }, _ => return Err(DecodeError::InvalidValue), }, @@ -3176,9 +3276,9 @@ impl Writeable for HTLCSource { 0u8.write(writer)?; hop_data.write(writer)?; }, - &HTLCSource::OutboundRoute { ref route, ref session_priv, ref first_hop_htlc_msat } => { + &HTLCSource::OutboundRoute { ref path, ref session_priv, ref first_hop_htlc_msat } => { 1u8.write(writer)?; - route.write(writer)?; + path.write(writer)?; session_priv.write(writer)?; first_hop_htlc_msat.write(writer)?; } @@ -3192,7 +3292,7 @@ impl Readable for HTLCSource { match >::read(reader)? { 0 => Ok(HTLCSource::PreviousHopData(Readable::read(reader)?)), 1 => Ok(HTLCSource::OutboundRoute { - route: Readable::read(reader)?, + path: Readable::read(reader)?, session_priv: Readable::read(reader)?, first_hop_htlc_msat: Readable::read(reader)?, }), @@ -3309,6 +3409,7 @@ impl Writeable for ChannelManager htlc.src.write(writer)?; htlc.value.write(writer)?; htlc.payment_data.write(writer)?; + htlc.cltv_expiry.write(writer)?; } } @@ -3381,6 +3482,15 @@ pub struct ChannelManagerReadArgs<'a, ChanSigner: ChannelKeys, M: Deref> where M pub channel_monitors: &'a mut HashMap, } +// Implement ReadableArgs for an Arc'd ChannelManager to make it a bit easier to work with the +// SipmleArcChannelManager type: +impl<'a, R : ::std::io::Read, ChanSigner: ChannelKeys + Readable, M: Deref> ReadableArgs> for (Sha256dHash, Arc>) where M::Target: ManyChannelMonitor { + fn read(reader: &mut R, args: ChannelManagerReadArgs<'a, ChanSigner, M>) -> Result { + let (blockhash, chan_manager) = <(Sha256dHash, ChannelManager)>::read(reader, args)?; + Ok((blockhash, Arc::new(chan_manager))) + } +} + impl<'a, R : ::std::io::Read, ChanSigner: ChannelKeys + Readable, M: Deref> ReadableArgs> for (Sha256dHash, ChannelManager) where M::Target: ManyChannelMonitor { fn read(reader: &mut R, args: ChannelManagerReadArgs<'a, ChanSigner, M>) -> Result { let _ver: u8 = Readable::read(reader)?; @@ -3454,6 +3564,7 @@ impl<'a, R : ::std::io::Read, ChanSigner: ChannelKeys + Readable, M: Deref> R src: Readable::read(reader)?, value: Readable::read(reader)?, payment_data: Readable::read(reader)?, + cltv_expiry: Readable::read(reader)?, }); } claimable_htlcs.insert(payment_hash, previous_hops);