Refactor payment-claim logic to ensure MPP-claim atomicity
[rust-lightning] / lightning / src / ln / channelmanager.rs
index efcf2159a068fa21b6f18e9674c98960a0f2f501..918f8b28f37304c319e065f4b89fda05f3b0de98 100644 (file)
@@ -30,7 +30,7 @@ use chain::transaction::OutPoint;
 use ln::channel::{Channel, ChannelError};
 use ln::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateErr, ManyChannelMonitor, CLTV_CLAIM_BUFFER, LATENCY_GRACE_PERIOD_BLOCKS, ANTI_REORG_DELAY};
 use ln::features::{InitFeatures, NodeFeatures};
-use ln::router::Route;
+use ln::router::{Route, RouteHop};
 use ln::msgs;
 use ln::onion_utils;
 use ln::msgs::{ChannelMessageHandler, DecodeError, LightningError};
@@ -136,7 +136,7 @@ struct ClaimableHTLC {
 pub(super) enum HTLCSource {
        PreviousHopData(HTLCPreviousHopData),
        OutboundRoute {
-               route: Route,
+               path: Vec<RouteHop>,
                session_priv: SecretKey,
                /// Technically we can recalculate this from the route, but we cache it here to avoid
                /// doing a double-pass on route when we get a failure back
@@ -147,7 +147,7 @@ pub(super) enum HTLCSource {
 impl HTLCSource {
        pub fn dummy() -> Self {
                HTLCSource::OutboundRoute {
-                       route: Route { hops: Vec::new() },
+                       path: Vec::new(),
                        session_priv: SecretKey::from_slice(&[1; 32]).unwrap(),
                        first_hop_htlc_msat: 0,
                }
@@ -1231,13 +1231,16 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
        /// bit set (either as required or as available). If multiple paths are present in the Route,
        /// we assume the invoice had the basic_mpp feature set.
        pub fn send_payment(&self, route: Route, payment_hash: PaymentHash, payment_secret: &Option<PaymentSecret>) -> Result<(), APIError> {
-               if route.hops.len() < 1 || route.hops.len() > 20 {
-                       return Err(APIError::RouteError{err: "Route didn't go anywhere/had bogus size"});
+               if route.paths.len() < 1 || route.paths.len() > 1 {
+                       return Err(APIError::RouteError{err: "We currently don't support MPP, and we need at least one path"});
+               }
+               if route.paths[0].len() < 1 || route.paths[0].len() > 20 {
+                       return Err(APIError::RouteError{err: "Path didn't go anywhere/had bogus size"});
                }
                let our_node_id = self.get_our_node_id();
-               for (idx, hop) in route.hops.iter().enumerate() {
-                       if idx != route.hops.len() - 1 && hop.pubkey == our_node_id {
-                               return Err(APIError::RouteError{err: "Route went through us but wasn't a simple rebalance loop to us"});
+               for (idx, hop) in route.paths[0].iter().enumerate() {
+                       if idx != route.paths[0].len() - 1 && hop.pubkey == our_node_id {
+                               return Err(APIError::RouteError{err: "Path went through us but wasn't a simple rebalance loop to us"});
                        }
                }
 
@@ -1245,9 +1248,9 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
 
                let cur_height = self.latest_block_height.load(Ordering::Acquire) as u32 + 1;
 
-               let onion_keys = secp_call!(onion_utils::construct_onion_keys(&self.secp_ctx, &route, &session_priv),
+               let onion_keys = secp_call!(onion_utils::construct_onion_keys(&self.secp_ctx, &route.paths[0], &session_priv),
                                APIError::RouteError{err: "Pubkey along hop was maliciously selected"});
-               let (onion_payloads, htlc_msat, htlc_cltv) = onion_utils::build_onion_payloads(&route, payment_secret, cur_height)?;
+               let (onion_payloads, htlc_msat, htlc_cltv) = onion_utils::build_onion_payloads(&route.paths[0], payment_secret, cur_height)?;
                if onion_utils::route_size_insane(&onion_payloads) {
                        return Err(APIError::RouteError{err: "Route size too large considering onion data"});
                }
@@ -1257,7 +1260,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
 
                let err: Result<(), _> = loop {
                        let mut channel_lock = self.channel_state.lock().unwrap();
-                       let id = match channel_lock.short_to_id.get(&route.hops.first().unwrap().short_channel_id) {
+                       let id = match channel_lock.short_to_id.get(&route.paths[0].first().unwrap().short_channel_id) {
                                None => return Err(APIError::ChannelUnavailable{err: "No channel available with first hop!"}),
                                Some(id) => id.clone(),
                        };
@@ -1265,14 +1268,14 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
                        let channel_state = &mut *channel_lock;
                        if let hash_map::Entry::Occupied(mut chan) = channel_state.by_id.entry(id) {
                                match {
-                                       if chan.get().get_their_node_id() != route.hops.first().unwrap().pubkey {
+                                       if chan.get().get_their_node_id() != route.paths[0].first().unwrap().pubkey {
                                                return Err(APIError::RouteError{err: "Node ID mismatch on first hop!"});
                                        }
                                        if !chan.get().is_live() {
                                                return Err(APIError::ChannelUnavailable{err: "Peer for first hop currently disconnected/pending monitor update!"});
                                        }
                                        break_chan_entry!(self, chan.get_mut().send_htlc_and_commit(htlc_msat, payment_hash.clone(), htlc_cltv, HTLCSource::OutboundRoute {
-                                               route: route.clone(),
+                                               path: route.paths[0].clone(),
                                                session_priv: session_priv.clone(),
                                                first_hop_htlc_msat: htlc_msat,
                                        }, onion_packet), channel_state, chan)
@@ -1288,7 +1291,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
                                                }
 
                                                channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
-                                                       node_id: route.hops.first().unwrap().pubkey,
+                                                       node_id: route.paths[0].first().unwrap().pubkey,
                                                        updates: msgs::CommitmentUpdate {
                                                                update_add_htlcs: vec![update_add],
                                                                update_fulfill_htlcs: Vec::new(),
@@ -1305,7 +1308,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
                        return Ok(());
                };
 
-               match handle_error!(self, err, route.hops.first().unwrap().pubkey) {
+               match handle_error!(self, err, route.paths[0].first().unwrap().pubkey) {
                        Ok(_) => unreachable!(),
                        Err(e) => { Err(APIError::ChannelUnavailable { err: e.err }) }
                }
@@ -1750,7 +1753,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
                //between the branches here. We should make this async and move it into the forward HTLCs
                //timer handling.
                match source {
-                       HTLCSource::OutboundRoute { ref route, .. } => {
+                       HTLCSource::OutboundRoute { ref path, .. } => {
                                log_trace!(self, "Failing outbound payment HTLC with payment_hash {}", log_bytes!(payment_hash.0));
                                mem::drop(channel_state_lock);
                                match &onion_error {
@@ -1792,7 +1795,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
                                                self.pending_events.lock().unwrap().push(
                                                        events::Event::PaymentFailed {
                                                                payment_hash: payment_hash.clone(),
-                                                               rejected_by_dest: route.hops.len() == 1,
+                                                               rejected_by_dest: path.len() == 1,
 #[cfg(test)]
                                                                error_code: Some(*failure_code),
                                                        }
@@ -1847,6 +1850,9 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
        /// privacy-breaking recipient-probing attacks which may reveal payment activity to
        /// motivated attackers.
        ///
+       /// Note that the privacy concerns in (b) are not relevant in payments with a payment_secret
+       /// set. Thus, for such payments we will claim any payments which do not under-pay.
+       ///
        /// May panic if called except in response to a PaymentReceived event.
        pub fn claim_funds(&self, payment_preimage: PaymentPreimage, payment_secret: &Option<PaymentSecret>, expected_amount: u64) -> bool {
                let payment_hash = PaymentHash(Sha256::hash(&payment_preimage.0).into_inner());
@@ -1856,9 +1862,40 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
                let mut channel_state = Some(self.channel_state.lock().unwrap());
                let removed_source = channel_state.as_mut().unwrap().claimable_htlcs.remove(&(payment_hash, *payment_secret));
                if let Some(mut sources) = removed_source {
+                       assert!(!sources.is_empty());
+
+                       // If we are claiming an MPP payment, we have to take special care to ensure that each
+                       // channel exists before claiming all of the payments (inside one lock).
+                       // Note that channel existance is sufficient as we should always get a monitor update
+                       // which will take care of the real HTLC claim enforcement.
+                       //
+                       // If we find an HTLC which we would need to claim but for which we do not have a
+                       // channel, we will fail all parts of the MPP payment. While we could wait and see if
+                       // the sender retries the already-failed path(s), it should be a pretty rare case where
+                       // we got all the HTLCs and then a channel closed while we were waiting for the user to
+                       // provide the preimage, so worrying too much about the optimal handling isn't worth
+                       // it.
+
+                       let (is_mpp, mut valid_mpp) = if let &Some(ref data) = &sources[0].payment_data {
+                               assert!(payment_secret.is_some());
+                               (true, data.total_msat >= expected_amount)
+                       } else {
+                               assert!(payment_secret.is_none());
+                               (false, false)
+                       };
+
+                       for htlc in sources.iter() {
+                               if !is_mpp || !valid_mpp { break; }
+                               if let None = channel_state.as_ref().unwrap().short_to_id.get(&htlc.prev_hop.short_channel_id) {
+                                       valid_mpp = false;
+                               }
+                       }
+
+                       let mut errs = Vec::new();
+                       let mut claimed_any_htlcs = false;
                        for htlc in sources.drain(..) {
                                if channel_state.is_none() { channel_state = Some(self.channel_state.lock().unwrap()); }
-                               if htlc.value < expected_amount || htlc.value > expected_amount * 2 {
+                               if (is_mpp && !valid_mpp) || (!is_mpp && (htlc.value < expected_amount || htlc.value > expected_amount * 2)) {
                                        let mut htlc_msat_data = byte_utils::be64_to_array(htlc.value).to_vec();
                                        let mut height_data = byte_utils::be32_to_array(self.latest_block_height.load(Ordering::Acquire) as u32).to_vec();
                                        htlc_msat_data.append(&mut height_data);
@@ -1866,78 +1903,116 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
                                                                         HTLCSource::PreviousHopData(htlc.prev_hop), &payment_hash,
                                                                         HTLCFailReason::Reason { failure_code: 0x4000|15, data: htlc_msat_data });
                                } else {
-                                       self.claim_funds_internal(channel_state.take().unwrap(), HTLCSource::PreviousHopData(htlc.prev_hop), payment_preimage);
+                                       match self.claim_funds_from_hop(channel_state.as_mut().unwrap(), htlc.prev_hop, payment_preimage) {
+                                               Err(Some(e)) => {
+                                                       if let msgs::ErrorAction::IgnoreError = e.1.err.action {
+                                                               // We got a temporary failure updating monitor, but will claim the
+                                                               // HTLC when the monitor updating is restored (or on chain).
+                                                               log_error!(self, "Temporary failure claiming HTLC, treating as success: {}", e.1.err.err);
+                                                               claimed_any_htlcs = true;
+                                                       } else { errs.push(e); }
+                                               },
+                                               Err(None) if is_mpp => unreachable!("We already checked for channel existence, we can't fail here!"),
+                                               Err(None) => {
+                                                       log_warn!(self, "Channel we expected to claim an HTLC from was closed.");
+                                               },
+                                               Ok(()) => claimed_any_htlcs = true,
+                                       }
                                }
                        }
-                       true
+
+                       // Now that we've done the entire above loop in one lock, we can handle any errors
+                       // which were generated.
+                       channel_state.take();
+
+                       for (their_node_id, err) in errs.drain(..) {
+                               let res: Result<(), _> = Err(err);
+                               let _ = handle_error!(self, res, their_node_id);
+                       }
+
+                       claimed_any_htlcs
                } else { false }
        }
-       fn claim_funds_internal(&self, mut channel_state_lock: MutexGuard<ChannelHolder<ChanSigner>>, source: HTLCSource, payment_preimage: PaymentPreimage) {
-               let (their_node_id, err) = loop {
-                       match source {
-                               HTLCSource::OutboundRoute { .. } => {
-                                       mem::drop(channel_state_lock);
-                                       let mut pending_events = self.pending_events.lock().unwrap();
-                                       pending_events.push(events::Event::PaymentSent {
-                                               payment_preimage
-                                       });
-                               },
-                               HTLCSource::PreviousHopData(HTLCPreviousHopData { short_channel_id, htlc_id, .. }) => {
-                                       //TODO: Delay the claimed_funds relaying just like we do outbound relay!
-                                       let channel_state = &mut *channel_state_lock;
 
-                                       let chan_id = match channel_state.short_to_id.get(&short_channel_id) {
-                                               Some(chan_id) => chan_id.clone(),
-                                               None => {
-                                                       // TODO: There is probably a channel manager somewhere that needs to
-                                                       // learn the preimage as the channel already hit the chain and that's
-                                                       // why it's missing.
-                                                       return
-                                               }
-                                       };
+       fn claim_funds_from_hop(&self, channel_state_lock: &mut MutexGuard<ChannelHolder<ChanSigner>>, prev_hop: HTLCPreviousHopData, payment_preimage: PaymentPreimage) -> Result<(), Option<(PublicKey, MsgHandleErrInternal)>> {
+               //TODO: Delay the claimed_funds relaying just like we do outbound relay!
+               let channel_state = &mut **channel_state_lock;
+               let chan_id = match channel_state.short_to_id.get(&prev_hop.short_channel_id) {
+                       Some(chan_id) => chan_id.clone(),
+                       None => {
+                               return Err(None)
+                       }
+               };
 
-                                       if let hash_map::Entry::Occupied(mut chan) = channel_state.by_id.entry(chan_id) {
-                                               let was_frozen_for_monitor = chan.get().is_awaiting_monitor_update();
-                                               match chan.get_mut().get_update_fulfill_htlc_and_commit(htlc_id, payment_preimage) {
-                                                       Ok((msgs, monitor_option)) => {
-                                                               if let Some(monitor_update) = monitor_option {
-                                                                       if let Err(e) = self.monitor.update_monitor(chan.get().get_funding_txo().unwrap(), monitor_update) {
-                                                                               if was_frozen_for_monitor {
-                                                                                       assert!(msgs.is_none());
-                                                                               } else {
-                                                                                       break (chan.get().get_their_node_id(), handle_monitor_err!(self, e, channel_state, chan, RAACommitmentOrder::CommitmentFirst, false, msgs.is_some()));
-                                                                               }
-                                                                       }
-                                                               }
-                                                               if let Some((msg, commitment_signed)) = msgs {
-                                                                       channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
-                                                                               node_id: chan.get().get_their_node_id(),
-                                                                               updates: msgs::CommitmentUpdate {
-                                                                                       update_add_htlcs: Vec::new(),
-                                                                                       update_fulfill_htlcs: vec![msg],
-                                                                                       update_fail_htlcs: Vec::new(),
-                                                                                       update_fail_malformed_htlcs: Vec::new(),
-                                                                                       update_fee: None,
-                                                                                       commitment_signed,
-                                                                               }
-                                                                       });
-                                                               }
-                                                       },
-                                                       Err(_e) => {
-                                                               // TODO: There is probably a channel manager somewhere that needs to
-                                                               // learn the preimage as the channel may be about to hit the chain.
-                                                               //TODO: Do something with e?
-                                                               return
-                                                       },
+               if let hash_map::Entry::Occupied(mut chan) = channel_state.by_id.entry(chan_id) {
+                       let was_frozen_for_monitor = chan.get().is_awaiting_monitor_update();
+                       match chan.get_mut().get_update_fulfill_htlc_and_commit(prev_hop.htlc_id, payment_preimage) {
+                               Ok((msgs, monitor_option)) => {
+                                       if let Some(monitor_update) = monitor_option {
+                                               if let Err(e) = self.monitor.update_monitor(chan.get().get_funding_txo().unwrap(), monitor_update) {
+                                                       if was_frozen_for_monitor {
+                                                               assert!(msgs.is_none());
+                                                       } else {
+                                                               return Err(Some((chan.get().get_their_node_id(), handle_monitor_err!(self, e, channel_state, chan, RAACommitmentOrder::CommitmentFirst, false, msgs.is_some()).unwrap_err())));
+                                                       }
                                                }
-                                       } else { unreachable!(); }
+                                       }
+                                       if let Some((msg, commitment_signed)) = msgs {
+                                               channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
+                                                       node_id: chan.get().get_their_node_id(),
+                                                       updates: msgs::CommitmentUpdate {
+                                                               update_add_htlcs: Vec::new(),
+                                                               update_fulfill_htlcs: vec![msg],
+                                                               update_fail_htlcs: Vec::new(),
+                                                               update_fail_malformed_htlcs: Vec::new(),
+                                                               update_fee: None,
+                                                               commitment_signed,
+                                                       }
+                                               });
+                                       }
+                                       return Ok(())
+                               },
+                               Err(e) => {
+                                       // TODO: Do something with e?
+                                       // This should only occur if we are claiming an HTLC at the same time as the
+                                       // HTLC is being failed (eg because a block is being connected and this caused
+                                       // an HTLC to time out). This should, of course, only occur if the user is the
+                                       // one doing the claiming (as it being a part of a peer claim would imply we're
+                                       // about to lose funds) and only if the lock in claim_funds was dropped as a
+                                       // previous HTLC was failed (thus not for an MPP payment).
+                                       debug_assert!(false, "This shouldn't be reachable except in absurdly rare cases between monitor updates and HTLC timeouts: {:?}", e);
+                                       return Err(None)
                                },
                        }
-                       return;
-               };
+               } else { unreachable!(); }
+       }
 
-               mem::drop(channel_state_lock);
-               let _ = handle_error!(self, err, their_node_id);
+       fn claim_funds_internal(&self, mut channel_state_lock: MutexGuard<ChannelHolder<ChanSigner>>, source: HTLCSource, payment_preimage: PaymentPreimage) {
+               match source {
+                       HTLCSource::OutboundRoute { .. } => {
+                               mem::drop(channel_state_lock);
+                               let mut pending_events = self.pending_events.lock().unwrap();
+                               pending_events.push(events::Event::PaymentSent {
+                                       payment_preimage
+                               });
+                       },
+                       HTLCSource::PreviousHopData(hop_data) => {
+                               if let Err((their_node_id, err)) = match self.claim_funds_from_hop(&mut channel_state_lock, hop_data, payment_preimage) {
+                                       Ok(()) => Ok(()),
+                                       Err(None) => {
+                                               // TODO: There is probably a channel monitor somewhere that needs to
+                                               // learn the preimage as the channel already hit the chain and that's
+                                               // why it's missing.
+                                               Ok(())
+                                       },
+                                       Err(Some(res)) => Err(res),
+                               } {
+                                       mem::drop(channel_state_lock);
+                                       let res: Result<(), _> = Err(err);
+                                       let _ = handle_error!(self, res, their_node_id);
+                               }
+                       },
+               }
        }
 
        /// Gets the node_id held by this ChannelManager
@@ -3271,9 +3346,9 @@ impl Writeable for HTLCSource {
                                0u8.write(writer)?;
                                hop_data.write(writer)?;
                        },
-                       &HTLCSource::OutboundRoute { ref route, ref session_priv, ref first_hop_htlc_msat } => {
+                       &HTLCSource::OutboundRoute { ref path, ref session_priv, ref first_hop_htlc_msat } => {
                                1u8.write(writer)?;
-                               route.write(writer)?;
+                               path.write(writer)?;
                                session_priv.write(writer)?;
                                first_hop_htlc_msat.write(writer)?;
                        }
@@ -3287,7 +3362,7 @@ impl Readable for HTLCSource {
                match <u8 as Readable>::read(reader)? {
                        0 => Ok(HTLCSource::PreviousHopData(Readable::read(reader)?)),
                        1 => Ok(HTLCSource::OutboundRoute {
-                               route: Readable::read(reader)?,
+                               path: Readable::read(reader)?,
                                session_priv: Readable::read(reader)?,
                                first_hop_htlc_msat: Readable::read(reader)?,
                        }),