X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;ds=sidebyside;f=lightning%2Fsrc%2Fln%2Fchannelmanager.rs;h=a62004e6e51809faf48295a393605bbc7d365b59;hb=e1f7d4e22703331b40df46551cecb3884f7e5179;hp=d69665c5e30fc105cc0521c4e641b0c2b25538b5;hpb=9c5b3de2963a9bc381ddd51b4948235b97ecc5e6;p=rust-lightning diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index d69665c5..a62004e6 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -29,8 +29,8 @@ use chain::chaininterface::{BroadcasterInterface,ChainListener,FeeEstimator}; use chain::transaction::OutPoint; use ln::channel::{Channel, ChannelError}; use ln::channelmonitor::{ChannelMonitor, ChannelMonitorUpdateErr, ManyChannelMonitor, CLTV_CLAIM_BUFFER, LATENCY_GRACE_PERIOD_BLOCKS, ANTI_REORG_DELAY}; -use ln::router::Route; use ln::features::{InitFeatures, NodeFeatures}; +use ln::router::{Route, RouteHop}; use ln::msgs; use ln::onion_utils; use ln::msgs::{ChannelMessageHandler, DecodeError, LightningError}; @@ -76,6 +76,7 @@ enum PendingForwardReceiveHTLCInfo { }, Receive { payment_data: Option, + incoming_cltv_expiry: u32, // Used to track when we should expire pending HTLCs that go unclaimed }, } @@ -125,6 +126,7 @@ struct ClaimableHTLC { src: HTLCPreviousHopData, value: u64, payment_data: Option, + cltv_expiry: u32, } /// Tracks the inbound corresponding to an outbound HTLC @@ -132,7 +134,7 @@ struct ClaimableHTLC { pub(super) enum HTLCSource { PreviousHopData(HTLCPreviousHopData), OutboundRoute { - route: Route, + path: Vec, session_priv: SecretKey, /// Technically we can recalculate this from the route, but we cache it here to avoid /// doing a double-pass on route when we get a failure back @@ -143,7 +145,7 @@ pub(super) enum HTLCSource { impl HTLCSource { pub fn dummy() -> Self { HTLCSource::OutboundRoute { - route: Route { hops: Vec::new() }, + path: Vec::new(), session_priv: SecretKey::from_slice(&[1; 32]).unwrap(), first_hop_htlc_msat: 0, } @@ -428,15 +430,6 @@ const CHECK_CLTV_EXPIRY_SANITY: u32 = CLTV_EXPIRY_DELTA as u32 - LATENCY_GRACE_P #[allow(dead_code)] const CHECK_CLTV_EXPIRY_SANITY_2: u32 = CLTV_EXPIRY_DELTA as u32 - LATENCY_GRACE_PERIOD_BLOCKS - 2*CLTV_CLAIM_BUFFER; -macro_rules! secp_call { - ( $res: expr, $err: expr ) => { - match $res { - Ok(key) => key, - Err(_) => return Err($err), - } - }; -} - /// Details of a channel, as returned by ChannelManager::list_channels and ChannelManager::list_usable_channels pub struct ChannelDetails { /// The channel's ID (prior to funding transaction generation, this is a random 32 bytes, @@ -473,22 +466,65 @@ pub struct ChannelDetails { pub is_live: bool, } +/// If a payment fails to send, it can be in one of several states. This enum is returned as the +/// Err() type describing which state the payment is in, see the description of individual enum +/// states for more. +#[derive(Debug)] +pub enum PaymentSendFailure { + /// A parameter which was passed to send_payment was invalid, preventing us from attempting to + /// send the payment at all. No channel state has been changed or messages sent to peers, and + /// once you've changed the parameter at error, you can freely retry the payment in full. + ParameterError(APIError), + /// All paths which were attempted failed to send, with no channel state change taking place. + /// You can freely retry the payment in full (though you probably want to do so over different + /// paths than the ones selected). + AllFailedRetrySafe(Vec), + /// Some paths which were attempted failed to send, though possibly not all. At least some + /// paths have irrevocably committed to the HTLC and retrying the payment in full would result + /// in over-/re-payment. + /// + /// The results here are ordered the same as the paths in the route object which was passed to + /// send_payment, and any Errs which are not APIError::MonitorUpdateFailed can be safely + /// retried. + /// + /// Any entries which contain Err(APIError::MonitorUpdateFailed) or Ok(()) MUST NOT be retried + /// as they will result in over-/re-payment. + PartialFailure(Vec>), +} + macro_rules! handle_error { - ($self: ident, $internal: expr, $their_node_id: expr, $locked_channel_state: expr) => { + ($self: ident, $internal: expr, $their_node_id: expr) => { match $internal { Ok(msg) => Ok(msg), Err(MsgHandleErrInternal { err, shutdown_finish }) => { + let mut channel_state = None; if let Some((shutdown_res, update_option)) = shutdown_finish { $self.finish_force_close_channel(shutdown_res); if let Some(update) = update_option { - $locked_channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { + channel_state = Some($self.channel_state.lock().unwrap()); + channel_state.as_mut().unwrap().pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { msg: update }); } } + #[cfg(debug_assertions)] + { + // In testing, we always lock here to ensure there are no deadlocks where we + // were holding the lock coming into the macro but didn't catch it because we + // didn't generate an action and didn't have any HTLCs to fail backwards in the + // finish_force_close_channel. + if channel_state.is_none() { + channel_state = Some($self.channel_state.lock().unwrap()); + } + } log_error!($self, "{}", err.err); if let msgs::ErrorAction::IgnoreError = err.action { - } else { $locked_channel_state.pending_msg_events.push(events::MessageSendEvent::HandleError { node_id: $their_node_id, action: err.action.clone() }); } + } else { + if channel_state.is_none() { + channel_state = Some($self.channel_state.lock().unwrap()); + } + channel_state.as_mut().unwrap().pending_msg_events.push(events::MessageSendEvent::HandleError { node_id: $their_node_id, action: err.action.clone() }); + } // Return error in case higher-API need one Err(err) }, @@ -996,7 +1032,10 @@ impl ChannelManager where M::T // delay) once they've send us a commitment_signed! PendingHTLCStatus::Forward(PendingHTLCInfo { - type_data: PendingForwardReceiveHTLCInfo::Receive { payment_data }, + type_data: PendingForwardReceiveHTLCInfo::Receive { + payment_data, + incoming_cltv_expiry: msg.cltv_expiry, + }, payment_hash: msg.payment_hash.clone(), incoming_shared_secret: shared_secret, amt_to_forward: next_hop_data.amt_to_forward, @@ -1167,20 +1206,23 @@ impl ChannelManager where M::T /// payment_preimage tracking (which you should already be doing as they represent "proof of /// payment") and prevent double-sends yourself. /// - /// May generate a SendHTLCs message event on success, which should be relayed. + /// May generate SendHTLCs message(s) event on success, which should be relayed. + /// + /// Each path may have a different return value, and PaymentSendValue may return a Vec with + /// each entry matching the corresponding-index entry in the route paths. /// - /// Raises APIError::RoutError when invalid route or forward parameter - /// (cltv_delta, fee, node public key) is specified. - /// Raises APIError::ChannelUnavailable if the next-hop channel is not available for updates - /// (including due to previous monitor update failure or new permanent monitor update failure). - /// Raised APIError::MonitorUpdateFailed if a new monitor update failure prevented sending the - /// relevant updates. + /// In general, a path may raise: + /// * APIError::RouteError when an invalid route or forwarding parameter (cltv_delta, fee, + /// node public key) is specified. + /// * APIError::ChannelUnavailable if the next-hop channel is not available for updates + /// (including due to previous monitor update failure or new permanent monitor update + /// failure). + /// * APIError::MonitorUpdateFailed if a new monitor update failure prevented sending the + /// relevant updates. /// - /// In case of APIError::RouteError/APIError::ChannelUnavailable, the payment send has failed - /// and you may wish to retry via a different route immediately. - /// In case of APIError::MonitorUpdateFailed, the commitment update has been irrevocably - /// committed on our end and we're just waiting for a monitor update to send it. Do NOT retry - /// the payment via a different route unless you intend to pay twice! + /// Note that depending on the type of the PaymentSendFailure the HTLC may have been + /// irrevocably committed to on our end. In such a case, do NOT retry the payment with a + /// different route unless you intend to pay twice! /// /// payment_secret is unrelated to payment_hash (or PaymentPreimage) and exists to authenticate /// the sender to the recipient and prevent payment-probing (deanonymization) attacks. For @@ -1188,85 +1230,130 @@ impl ChannelManager where M::T /// must not contain multiple paths as otherwise the multipath data cannot be sent. /// If a payment_secret *is* provided, we assume that the invoice had the basic_mpp feature bit /// set (either as required or as available). - pub fn send_payment(&self, route: Route, payment_hash: PaymentHash, payment_secret: Option<&[u8; 32]>) -> Result<(), APIError> { - if route.hops.len() < 1 || route.hops.len() > 20 { - return Err(APIError::RouteError{err: "Route didn't go anywhere/had bogus size"}); + pub fn send_payment(&self, route: Route, payment_hash: PaymentHash, payment_secret: Option<&[u8; 32]>) -> Result<(), PaymentSendFailure> { + if route.paths.len() < 1 { + return Err(PaymentSendFailure::ParameterError(APIError::RouteError{err: "There must be at least one path to send over"})); } + if route.paths.len() > 10 { + return Err(PaymentSendFailure::ParameterError(APIError::RouteError{err: "Sending over more than 10 paths is not currently supported"})); + } + let mut total_value = 0; let our_node_id = self.get_our_node_id(); - for (idx, hop) in route.hops.iter().enumerate() { - if idx != route.hops.len() - 1 && hop.pubkey == our_node_id { - return Err(APIError::RouteError{err: "Route went through us but wasn't a simple rebalance loop to us"}); + for path in route.paths.iter() { + if path.len() < 1 || path.len() > 20 { + return Err(PaymentSendFailure::ParameterError(APIError::RouteError{err: "Path didn't go anywhere/had bogus size"})); + } + for (idx, hop) in path.iter().enumerate() { + if idx != path.len() - 1 && hop.pubkey == our_node_id { + return Err(PaymentSendFailure::ParameterError(APIError::RouteError{err: "Path went through us but wasn't a simple rebalance loop to us"})); + } } + total_value += path.last().unwrap().fee_msat; } - - let (session_priv, prng_seed) = self.keys_manager.get_onion_rand(); - let cur_height = self.latest_block_height.load(Ordering::Acquire) as u32 + 1; + let mut results = Vec::new(); + 'path_loop: for path in route.paths.iter() { + macro_rules! check_res_push { + ($res: expr) => { match $res { + Ok(r) => r, + Err(e) => { + results.push(Err(e)); + continue 'path_loop; + }, + } + } + } - let onion_keys = secp_call!(onion_utils::construct_onion_keys(&self.secp_ctx, &route, &session_priv), - APIError::RouteError{err: "Pubkey along hop was maliciously selected"}); - let (onion_payloads, htlc_msat, htlc_cltv) = onion_utils::build_onion_payloads(&route, payment_secret, cur_height)?; - if onion_utils::route_size_insane(&onion_payloads) { - return Err(APIError::RouteError{err: "Route had too large size once"}); - } - let onion_packet = onion_utils::construct_onion_packet(onion_payloads, onion_keys, prng_seed, &payment_hash); + log_trace!(self, "Attempting to send payment for path with next hop {}", path.first().unwrap().short_channel_id); + let (session_priv, prng_seed) = self.keys_manager.get_onion_rand(); - let _ = self.total_consistency_lock.read().unwrap(); + let onion_keys = check_res_push!(onion_utils::construct_onion_keys(&self.secp_ctx, &path, &session_priv) + .map_err(|_| APIError::RouteError{err: "Pubkey along hop was maliciously selected"})); + let (onion_payloads, htlc_msat, htlc_cltv) = check_res_push!(onion_utils::build_onion_payloads(&path, total_value, payment_secret, cur_height)); + if onion_utils::route_size_insane(&onion_payloads) { + check_res_push!(Err(APIError::RouteError{err: "Route had too large size once"})); + } + let onion_packet = onion_utils::construct_onion_packet(onion_payloads, onion_keys, prng_seed, &payment_hash); - let mut channel_lock = self.channel_state.lock().unwrap(); - let err: Result<(), _> = loop { + let _ = self.total_consistency_lock.read().unwrap(); - let id = match channel_lock.short_to_id.get(&route.hops.first().unwrap().short_channel_id) { - None => return Err(APIError::ChannelUnavailable{err: "No channel available with first hop!"}), - Some(id) => id.clone(), - }; + let err: Result<(), _> = loop { + let mut channel_lock = self.channel_state.lock().unwrap(); + let id = match channel_lock.short_to_id.get(&path.first().unwrap().short_channel_id) { + None => check_res_push!(Err(APIError::ChannelUnavailable{err: "No channel available with first hop!"})), + Some(id) => id.clone(), + }; - let channel_state = &mut *channel_lock; - if let hash_map::Entry::Occupied(mut chan) = channel_state.by_id.entry(id) { - match { - if chan.get().get_their_node_id() != route.hops.first().unwrap().pubkey { - return Err(APIError::RouteError{err: "Node ID mismatch on first hop!"}); - } - if !chan.get().is_live() { - return Err(APIError::ChannelUnavailable{err: "Peer for first hop currently disconnected/pending monitor update!"}); - } - break_chan_entry!(self, chan.get_mut().send_htlc_and_commit(htlc_msat, payment_hash.clone(), htlc_cltv, HTLCSource::OutboundRoute { - route: route.clone(), - session_priv: session_priv.clone(), - first_hop_htlc_msat: htlc_msat, - }, onion_packet), channel_state, chan) - } { - Some((update_add, commitment_signed, chan_monitor)) => { - if let Err(e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) { - maybe_break_monitor_err!(self, e, channel_state, chan, RAACommitmentOrder::CommitmentFirst, false, true); - // Note that MonitorUpdateFailed here indicates (per function docs) - // that we will resent the commitment update once we unfree monitor - // updating, so we have to take special care that we don't return - // something else in case we will resend later! - return Err(APIError::MonitorUpdateFailed); + let channel_state = &mut *channel_lock; + if let hash_map::Entry::Occupied(mut chan) = channel_state.by_id.entry(id) { + match { + if chan.get().get_their_node_id() != path.first().unwrap().pubkey { + check_res_push!(Err(APIError::RouteError{err: "Node ID mismatch on first hop!"})); + } + if !chan.get().is_live() { + check_res_push!(Err(APIError::ChannelUnavailable{err: "Peer for first hop currently disconnected/pending monitor update!"})); } + break_chan_entry!(self, chan.get_mut().send_htlc_and_commit(htlc_msat, payment_hash.clone(), htlc_cltv, HTLCSource::OutboundRoute { + path: path.clone(), + session_priv: session_priv.clone(), + first_hop_htlc_msat: htlc_msat, + }, onion_packet), channel_state, chan) + } { + Some((update_add, commitment_signed, chan_monitor)) => { + if let Err(e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) { + maybe_break_monitor_err!(self, e, channel_state, chan, RAACommitmentOrder::CommitmentFirst, false, true); + // Note that MonitorUpdateFailed here indicates (per function docs) + // that we will resent the commitment update once we unfree monitor + // updating, so we have to take special care that we don't return + // something else in case we will resend later! + check_res_push!(Err(APIError::MonitorUpdateFailed)); + } - channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { - node_id: route.hops.first().unwrap().pubkey, - updates: msgs::CommitmentUpdate { - update_add_htlcs: vec![update_add], - update_fulfill_htlcs: Vec::new(), - update_fail_htlcs: Vec::new(), - update_fail_malformed_htlcs: Vec::new(), - update_fee: None, - commitment_signed, - }, - }); - }, - None => {}, - } - } else { unreachable!(); } - return Ok(()); - }; + channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { + node_id: path.first().unwrap().pubkey, + updates: msgs::CommitmentUpdate { + update_add_htlcs: vec![update_add], + update_fulfill_htlcs: Vec::new(), + update_fail_htlcs: Vec::new(), + update_fail_malformed_htlcs: Vec::new(), + update_fee: None, + commitment_signed, + }, + }); + }, + None => {}, + } + } else { unreachable!(); } + results.push(Ok(())); + continue 'path_loop; + }; - match handle_error!(self, err, route.hops.first().unwrap().pubkey, channel_lock) { - Ok(_) => unreachable!(), - Err(e) => { Err(APIError::ChannelUnavailable { err: e.err }) } + match handle_error!(self, err, path.first().unwrap().pubkey) { + Ok(_) => unreachable!(), + Err(e) => { + check_res_push!(Err(APIError::ChannelUnavailable { err: e.err })); + }, + } + } + let mut has_ok = false; + let mut has_err = false; + for res in results.iter() { + if res.is_ok() { has_ok = true; } + if res.is_err() { has_err = true; } + if let &Err(APIError::MonitorUpdateFailed) = res { + // MonitorUpdateFailed is inherently unsafe to retry, so we call it a + // PartialFailure. + has_err = true; + has_ok = true; + break; + } + } + if has_err && has_ok { + Err(PaymentSendFailure::PartialFailure(results)) + } else if has_err { + Err(PaymentSendFailure::AllFailedRetrySafe(results.drain(..).map(|r| r.unwrap_err()).collect())) + } else { + Ok(()) } } @@ -1283,8 +1370,7 @@ impl ChannelManager where M::T let _ = self.total_consistency_lock.read().unwrap(); let (mut chan, msg, chan_monitor) = { - let mut channel_state = self.channel_state.lock().unwrap(); - let (res, chan) = match channel_state.by_id.remove(temporary_channel_id) { + let (res, chan) = match self.channel_state.lock().unwrap().by_id.remove(temporary_channel_id) { Some(mut chan) => { (chan.get_outbound_funding_created(funding_txo) .map_err(|e| if let ChannelError::Close(msg) = e { @@ -1294,7 +1380,7 @@ impl ChannelManager where M::T }, None => return }; - match handle_error!(self, res, chan.get_their_node_id(), channel_state) { + match handle_error!(self, res, chan.get_their_node_id()) { Ok(funding_msg) => { (chan, funding_msg.0, funding_msg.1) }, @@ -1306,12 +1392,9 @@ impl ChannelManager where M::T if let Err(e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) { match e { ChannelMonitorUpdateErr::PermanentFailure => { - { - let mut channel_state = self.channel_state.lock().unwrap(); - match handle_error!(self, Err(MsgHandleErrInternal::from_finish_shutdown("ChannelMonitor storage failure", *temporary_channel_id, chan.force_shutdown(), None)), chan.get_their_node_id(), channel_state) { - Err(_) => { return; }, - Ok(()) => unreachable!(), - } + match handle_error!(self, Err(MsgHandleErrInternal::from_finish_shutdown("ChannelMonitor storage failure", *temporary_channel_id, chan.force_shutdown(), None)), chan.get_their_node_id()) { + Err(_) => { return; }, + Ok(()) => unreachable!(), } }, ChannelMonitorUpdateErr::TemporaryFailure => { @@ -1529,10 +1612,8 @@ impl ChannelManager where M::T }, ChannelError::CloseDelayBroadcast { .. } => { panic!("Wait is only generated on receipt of channel_reestablish, which is handled by try_chan_entry, we don't bother to support it here"); } }; - match handle_error!(self, err, their_node_id, channel_state) { - Ok(_) => unreachable!(), - Err(_) => { continue; }, - } + handle_errors.push((their_node_id, err)); + continue; } }; if let Err(e) = self.monitor.add_update_monitor(monitor.get_funding_txo().unwrap(), monitor) { @@ -1558,7 +1639,7 @@ impl ChannelManager where M::T for forward_info in pending_forwards.drain(..) { match forward_info { HTLCForwardInfo::AddHTLC { prev_short_channel_id, prev_htlc_id, forward_info: PendingHTLCInfo { - type_data: PendingForwardReceiveHTLCInfo::Receive { payment_data }, + type_data: PendingForwardReceiveHTLCInfo::Receive { payment_data, incoming_cltv_expiry }, incoming_shared_secret, payment_hash, amt_to_forward, .. }, } => { let prev_hop_data = HTLCPreviousHopData { short_channel_id: prev_short_channel_id, @@ -1574,6 +1655,7 @@ impl ChannelManager where M::T src: prev_hop_data, value: amt_to_forward, payment_data: payment_data.clone(), + cltv_expiry: incoming_cltv_expiry, }); if let &Some(ref data) = &payment_data { for htlc in htlcs.iter() { @@ -1624,11 +1706,8 @@ impl ChannelManager where M::T self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_source, &payment_hash, failure_reason); } - if handle_errors.len() > 0 { - let mut channel_state_lock = self.channel_state.lock().unwrap(); - for (their_node_id, err) in handle_errors.drain(..) { - let _ = handle_error!(self, err, their_node_id, channel_state_lock); - } + for (their_node_id, err) in handle_errors.drain(..) { + let _ = handle_error!(self, err, their_node_id); } if new_events.is_empty() { return } @@ -1694,7 +1773,7 @@ impl ChannelManager where M::T //between the branches here. We should make this async and move it into the forward HTLCs //timer handling. match source { - HTLCSource::OutboundRoute { ref route, .. } => { + HTLCSource::OutboundRoute { ref path, .. } => { log_trace!(self, "Failing outbound payment HTLC with payment_hash {}", log_bytes!(payment_hash.0)); mem::drop(channel_state_lock); match &onion_error { @@ -1736,7 +1815,7 @@ impl ChannelManager where M::T self.pending_events.lock().unwrap().push( events::Event::PaymentFailed { payment_hash: payment_hash.clone(), - rejected_by_dest: route.hops.len() == 1, + rejected_by_dest: path.len() == 1, #[cfg(test)] error_code: Some(*failure_code), } @@ -1800,9 +1879,19 @@ impl ChannelManager where M::T let mut channel_state = Some(self.channel_state.lock().unwrap()); let removed_source = channel_state.as_mut().unwrap().claimable_htlcs.remove(&(payment_hash, *payment_secret)); if let Some(mut sources) = removed_source { + assert!(!sources.is_empty()); + let passes_value = if let &Some(ref data) = &sources[0].payment_data { + assert!(payment_secret.is_some()); + if data.total_msat == expected_amount { true } else { false } + } else { + assert!(payment_secret.is_none()); + false + }; + + let mut one_claimed = false; for htlc in sources.drain(..) { if channel_state.is_none() { channel_state = Some(self.channel_state.lock().unwrap()); } - if htlc.value < expected_amount || htlc.value > expected_amount * 2 { + if !passes_value && (htlc.value < expected_amount || htlc.value > expected_amount * 2) { let mut htlc_msat_data = byte_utils::be64_to_array(htlc.value).to_vec(); let mut height_data = byte_utils::be32_to_array(self.latest_block_height.load(Ordering::Acquire) as u32).to_vec(); htlc_msat_data.append(&mut height_data); @@ -1811,9 +1900,10 @@ impl ChannelManager where M::T HTLCFailReason::Reason { failure_code: 0x4000|15, data: htlc_msat_data }); } else { self.claim_funds_internal(channel_state.take().unwrap(), HTLCSource::PreviousHopData(htlc.src), payment_preimage); + one_claimed = true; } } - true + one_claimed } else { false } } fn claim_funds_internal(&self, mut channel_state_lock: MutexGuard>, source: HTLCSource, payment_preimage: PaymentPreimage) { @@ -1880,7 +1970,8 @@ impl ChannelManager where M::T return; }; - let _ = handle_error!(self, err, their_node_id, channel_state_lock); + mem::drop(channel_state_lock); + let _ = handle_error!(self, err, their_node_id); } /// Gets the node_id held by this ChannelManager @@ -2611,9 +2702,9 @@ impl ChannelManager where M::T #[doc(hidden)] pub fn update_fee(&self, channel_id: [u8;32], feerate_per_kw: u64) -> Result<(), APIError> { let _ = self.total_consistency_lock.read().unwrap(); - let mut channel_state_lock = self.channel_state.lock().unwrap(); let their_node_id; let err: Result<(), _> = loop { + let mut channel_state_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_state_lock; match channel_state.by_id.entry(channel_id) { @@ -2652,7 +2743,7 @@ impl ChannelManager where M::T return Ok(()) }; - match handle_error!(self, err, their_node_id, channel_state_lock) { + match handle_error!(self, err, their_node_id) { Ok(_) => unreachable!(), Err(e) => { Err(APIError::APIMisuseError { err: e.err })} } @@ -2715,26 +2806,33 @@ impl ChainListener for ChannelM log_trace!(self, "Block {} at height {} connected with {} txn matched", header_hash, height, txn_matched.len()); let _ = self.total_consistency_lock.read().unwrap(); let mut failed_channels = Vec::new(); + let mut timed_out_htlcs = Vec::new(); { let mut channel_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_lock; let short_to_id = &mut channel_state.short_to_id; let pending_msg_events = &mut channel_state.pending_msg_events; channel_state.by_id.retain(|_, channel| { - let chan_res = channel.block_connected(header, height, txn_matched, indexes_of_txn_matched); - if let Ok(Some(funding_locked)) = chan_res { - pending_msg_events.push(events::MessageSendEvent::SendFundingLocked { - node_id: channel.get_their_node_id(), - msg: funding_locked, - }); - if let Some(announcement_sigs) = self.get_announcement_sigs(channel) { - pending_msg_events.push(events::MessageSendEvent::SendAnnouncementSignatures { + let res = channel.block_connected(header, height, txn_matched, indexes_of_txn_matched); + if let Ok((chan_res, mut timed_out_pending_htlcs)) = res { + timed_out_htlcs.reserve(timed_out_pending_htlcs.len()); + for (htlc_src, payment_hash, value) in timed_out_pending_htlcs.drain(..) { + timed_out_htlcs.push((htlc_src, payment_hash, value)); + } + if let Some(funding_locked) = chan_res { + pending_msg_events.push(events::MessageSendEvent::SendFundingLocked { node_id: channel.get_their_node_id(), - msg: announcement_sigs, + msg: funding_locked, }); + if let Some(announcement_sigs) = self.get_announcement_sigs(channel) { + pending_msg_events.push(events::MessageSendEvent::SendAnnouncementSignatures { + node_id: channel.get_their_node_id(), + msg: announcement_sigs, + }); + } + short_to_id.insert(channel.get_short_channel_id().unwrap(), channel.channel_id()); } - short_to_id.insert(channel.get_short_channel_id().unwrap(), channel.channel_id()); - } else if let Err(e) = chan_res { + } else if let Err(e) = res { pending_msg_events.push(events::MessageSendEvent::HandleError { node_id: channel.get_their_node_id(), action: msgs::ErrorAction::SendErrorMessage { msg: e }, @@ -2781,10 +2879,29 @@ impl ChainListener for ChannelM } true }); + + channel_state.claimable_htlcs.retain(|&(ref payment_hash, _), htlcs| { + htlcs.retain(|htlc| { + if height >= htlc.cltv_expiry - CLTV_CLAIM_BUFFER - LATENCY_GRACE_PERIOD_BLOCKS { + timed_out_htlcs.push((HTLCSource::PreviousHopData(htlc.src.clone()), payment_hash.clone(), htlc.value)); + false + } else { true } + }); + !htlcs.is_empty() + }); } for failure in failed_channels.drain(..) { self.finish_force_close_channel(failure); } + + for (source, payment_hash, value) in timed_out_htlcs.drain(..) { + // Call it preimage_unknown as the issue, ultimately, is that the user failed to + // provide us a preimage within the cltv_expiry time window. + self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), source, &payment_hash, HTLCFailReason::Reason { + failure_code: 0x4000 | 15, + data: byte_utils::be64_to_array(value).to_vec() + }); + } self.latest_block_height.store(height as usize, Ordering::Release); *self.last_block_hash.try_lock().expect("block_(dis)connected must not be called in parallel") = header_hash; } @@ -2826,146 +2943,82 @@ impl ChainListener for ChannelM impl ChannelMessageHandler for ChannelManager where M::Target: ManyChannelMonitor { fn handle_open_channel(&self, their_node_id: &PublicKey, their_features: InitFeatures, msg: &msgs::OpenChannel) { let _ = self.total_consistency_lock.read().unwrap(); - let res = self.internal_open_channel(their_node_id, their_features, msg); - if res.is_err() { - let mut channel_state_lock = self.channel_state.lock().unwrap(); - let _ = handle_error!(self, res, *their_node_id, channel_state_lock); - } + let _ = handle_error!(self, self.internal_open_channel(their_node_id, their_features, msg), *their_node_id); } fn handle_accept_channel(&self, their_node_id: &PublicKey, their_features: InitFeatures, msg: &msgs::AcceptChannel) { let _ = self.total_consistency_lock.read().unwrap(); - let res = self.internal_accept_channel(their_node_id, their_features, msg); - if res.is_err() { - let mut channel_state_lock = self.channel_state.lock().unwrap(); - let _ = handle_error!(self, res, *their_node_id, channel_state_lock); - } + let _ = handle_error!(self, self.internal_accept_channel(their_node_id, their_features, msg), *their_node_id); } fn handle_funding_created(&self, their_node_id: &PublicKey, msg: &msgs::FundingCreated) { let _ = self.total_consistency_lock.read().unwrap(); - let res = self.internal_funding_created(their_node_id, msg); - if res.is_err() { - let mut channel_state_lock = self.channel_state.lock().unwrap(); - let _ = handle_error!(self, res, *their_node_id, channel_state_lock); - } + let _ = handle_error!(self, self.internal_funding_created(their_node_id, msg), *their_node_id); } fn handle_funding_signed(&self, their_node_id: &PublicKey, msg: &msgs::FundingSigned) { let _ = self.total_consistency_lock.read().unwrap(); - let res = self.internal_funding_signed(their_node_id, msg); - if res.is_err() { - let mut channel_state_lock = self.channel_state.lock().unwrap(); - let _ = handle_error!(self, res, *their_node_id, channel_state_lock); - } + let _ = handle_error!(self, self.internal_funding_signed(their_node_id, msg), *their_node_id); } fn handle_funding_locked(&self, their_node_id: &PublicKey, msg: &msgs::FundingLocked) { let _ = self.total_consistency_lock.read().unwrap(); - let res = self.internal_funding_locked(their_node_id, msg); - if res.is_err() { - let mut channel_state_lock = self.channel_state.lock().unwrap(); - let _ = handle_error!(self, res, *their_node_id, channel_state_lock); - } + let _ = handle_error!(self, self.internal_funding_locked(their_node_id, msg), *their_node_id); } fn handle_shutdown(&self, their_node_id: &PublicKey, msg: &msgs::Shutdown) { let _ = self.total_consistency_lock.read().unwrap(); - let res = self.internal_shutdown(their_node_id, msg); - if res.is_err() { - let mut channel_state_lock = self.channel_state.lock().unwrap(); - let _ = handle_error!(self, res, *their_node_id, channel_state_lock); - } + let _ = handle_error!(self, self.internal_shutdown(their_node_id, msg), *their_node_id); } fn handle_closing_signed(&self, their_node_id: &PublicKey, msg: &msgs::ClosingSigned) { let _ = self.total_consistency_lock.read().unwrap(); - let res = self.internal_closing_signed(their_node_id, msg); - if res.is_err() { - let mut channel_state_lock = self.channel_state.lock().unwrap(); - let _ = handle_error!(self, res, *their_node_id, channel_state_lock); - } + let _ = handle_error!(self, self.internal_closing_signed(their_node_id, msg), *their_node_id); } fn handle_update_add_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateAddHTLC) { let _ = self.total_consistency_lock.read().unwrap(); - let res = self.internal_update_add_htlc(their_node_id, msg); - if res.is_err() { - let mut channel_state_lock = self.channel_state.lock().unwrap(); - let _ = handle_error!(self, res, *their_node_id, channel_state_lock); - } + let _ = handle_error!(self, self.internal_update_add_htlc(their_node_id, msg), *their_node_id); } fn handle_update_fulfill_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFulfillHTLC) { let _ = self.total_consistency_lock.read().unwrap(); - let res = self.internal_update_fulfill_htlc(their_node_id, msg); - if res.is_err() { - let mut channel_state_lock = self.channel_state.lock().unwrap(); - let _ = handle_error!(self, res, *their_node_id, channel_state_lock); - } + let _ = handle_error!(self, self.internal_update_fulfill_htlc(their_node_id, msg), *their_node_id); } fn handle_update_fail_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFailHTLC) { let _ = self.total_consistency_lock.read().unwrap(); - let res = self.internal_update_fail_htlc(their_node_id, msg); - if res.is_err() { - let mut channel_state_lock = self.channel_state.lock().unwrap(); - let _ = handle_error!(self, res, *their_node_id, channel_state_lock); - } + let _ = handle_error!(self, self.internal_update_fail_htlc(their_node_id, msg), *their_node_id); } fn handle_update_fail_malformed_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFailMalformedHTLC) { let _ = self.total_consistency_lock.read().unwrap(); - let res = self.internal_update_fail_malformed_htlc(their_node_id, msg); - if res.is_err() { - let mut channel_state_lock = self.channel_state.lock().unwrap(); - let _ = handle_error!(self, res, *their_node_id, channel_state_lock); - } + let _ = handle_error!(self, self.internal_update_fail_malformed_htlc(their_node_id, msg), *their_node_id); } fn handle_commitment_signed(&self, their_node_id: &PublicKey, msg: &msgs::CommitmentSigned) { let _ = self.total_consistency_lock.read().unwrap(); - let res = self.internal_commitment_signed(their_node_id, msg); - if res.is_err() { - let mut channel_state_lock = self.channel_state.lock().unwrap(); - let _ = handle_error!(self, res, *their_node_id, channel_state_lock); - } + let _ = handle_error!(self, self.internal_commitment_signed(their_node_id, msg), *their_node_id); } fn handle_revoke_and_ack(&self, their_node_id: &PublicKey, msg: &msgs::RevokeAndACK) { let _ = self.total_consistency_lock.read().unwrap(); - let res = self.internal_revoke_and_ack(their_node_id, msg); - if res.is_err() { - let mut channel_state_lock = self.channel_state.lock().unwrap(); - let _ = handle_error!(self, res, *their_node_id, channel_state_lock); - } + let _ = handle_error!(self, self.internal_revoke_and_ack(their_node_id, msg), *their_node_id); } fn handle_update_fee(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFee) { let _ = self.total_consistency_lock.read().unwrap(); - let res = self.internal_update_fee(their_node_id, msg); - if res.is_err() { - let mut channel_state_lock = self.channel_state.lock().unwrap(); - let _ = handle_error!(self, res, *their_node_id, channel_state_lock); - } + let _ = handle_error!(self, self.internal_update_fee(their_node_id, msg), *their_node_id); } fn handle_announcement_signatures(&self, their_node_id: &PublicKey, msg: &msgs::AnnouncementSignatures) { let _ = self.total_consistency_lock.read().unwrap(); - let res = self.internal_announcement_signatures(their_node_id, msg); - if res.is_err() { - let mut channel_state_lock = self.channel_state.lock().unwrap(); - let _ = handle_error!(self, res, *their_node_id, channel_state_lock); - } + let _ = handle_error!(self, self.internal_announcement_signatures(their_node_id, msg), *their_node_id); } fn handle_channel_reestablish(&self, their_node_id: &PublicKey, msg: &msgs::ChannelReestablish) { let _ = self.total_consistency_lock.read().unwrap(); - let res = self.internal_channel_reestablish(their_node_id, msg); - if res.is_err() { - let mut channel_state_lock = self.channel_state.lock().unwrap(); - let _ = handle_error!(self, res, *their_node_id, channel_state_lock); - } + let _ = handle_error!(self, self.internal_channel_reestablish(their_node_id, msg), *their_node_id); } fn peer_disconnected(&self, their_node_id: &PublicKey, no_connection_possible: bool) { @@ -3121,9 +3174,10 @@ impl Writeable for PendingHTLCInfo { onion_packet.write(writer)?; short_channel_id.write(writer)?; }, - &PendingForwardReceiveHTLCInfo::Receive { ref payment_data } => { + &PendingForwardReceiveHTLCInfo::Receive { ref payment_data, ref incoming_cltv_expiry } => { 1u8.write(writer)?; payment_data.write(writer)?; + incoming_cltv_expiry.write(writer)?; }, } self.incoming_shared_secret.write(writer)?; @@ -3144,6 +3198,7 @@ impl Readable for PendingHTLCInfo { }, 1u8 => PendingForwardReceiveHTLCInfo::Receive { payment_data: Readable::read(reader)?, + incoming_cltv_expiry: Readable::read(reader)?, }, _ => return Err(DecodeError::InvalidValue), }, @@ -3220,9 +3275,9 @@ impl Writeable for HTLCSource { 0u8.write(writer)?; hop_data.write(writer)?; }, - &HTLCSource::OutboundRoute { ref route, ref session_priv, ref first_hop_htlc_msat } => { + &HTLCSource::OutboundRoute { ref path, ref session_priv, ref first_hop_htlc_msat } => { 1u8.write(writer)?; - route.write(writer)?; + path.write(writer)?; session_priv.write(writer)?; first_hop_htlc_msat.write(writer)?; } @@ -3236,7 +3291,7 @@ impl Readable for HTLCSource { match >::read(reader)? { 0 => Ok(HTLCSource::PreviousHopData(Readable::read(reader)?)), 1 => Ok(HTLCSource::OutboundRoute { - route: Readable::read(reader)?, + path: Readable::read(reader)?, session_priv: Readable::read(reader)?, first_hop_htlc_msat: Readable::read(reader)?, }), @@ -3353,6 +3408,7 @@ impl Writeable for ChannelManager htlc.src.write(writer)?; htlc.value.write(writer)?; htlc.payment_data.write(writer)?; + htlc.cltv_expiry.write(writer)?; } } @@ -3498,6 +3554,7 @@ impl<'a, R : ::std::io::Read, ChanSigner: ChannelKeys + Readable, M: Deref> R src: Readable::read(reader)?, value: Readable::read(reader)?, payment_data: Readable::read(reader)?, + cltv_expiry: Readable::read(reader)?, }); } claimable_htlcs.insert(payment_hash, previous_hops);