Refactor payment-claim logic to ensure MPP-claim atomicity
authorMatt Corallo <git@bluematt.me>
Mon, 30 Mar 2020 20:24:19 +0000 (16:24 -0400)
committerMatt Corallo <git@bluematt.me>
Wed, 15 Apr 2020 00:50:41 +0000 (20:50 -0400)
Previously if we claimed an MPP where a previous-hop channel was
closed while we were waitng for the user to provide us the preimage
we'd simply skip claiming that HTLC without letting the user know.

This refactors the claim logic to first check that all the channels
are still available (which is actually all we need - we really
mostly care about updating the channel monitors, not the channels
themselves) and then claim the HTLCs in the same lock, ensuring
atomicity.

lightning/src/ln/chanmon_update_fail_tests.rs
lightning/src/ln/channelmanager.rs

index 4a682f74fa1056977ac30270ba91ce6aea0a7cec..289fd80679cadb83e976bd62ecba2d10b9d62ad2 100644 (file)
@@ -1458,7 +1458,7 @@ fn test_monitor_update_fail_claim() {
        nodes[1].node.handle_update_add_htlc(&nodes[2].node.get_our_node_id(), &payment_event.msgs[0]);
        let events = nodes[1].node.get_and_clear_pending_msg_events();
        assert_eq!(events.len(), 0);
-       nodes[1].logger.assert_log("lightning::ln::channelmanager".to_string(), "Failed to update ChannelMonitor".to_string(), 1);
+       nodes[1].logger.assert_log("lightning::ln::channelmanager".to_string(), "Temporary failure claiming HTLC, treating as success: Failed to update ChannelMonitor".to_string(), 1);
        commitment_signed_dance!(nodes[1], nodes[2], payment_event.commitment_msg, false, true);
 
        let bs_fail_update = get_htlc_update_msgs!(nodes[1], nodes[2].node.get_our_node_id());
@@ -1599,7 +1599,7 @@ fn monitor_update_claim_fail_no_response() {
        check_added_monitors!(nodes[1], 1);
        let events = nodes[1].node.get_and_clear_pending_msg_events();
        assert_eq!(events.len(), 0);
-       nodes[1].logger.assert_log("lightning::ln::channelmanager".to_string(), "Failed to update ChannelMonitor".to_string(), 1);
+       nodes[1].logger.assert_log("lightning::ln::channelmanager".to_string(), "Temporary failure claiming HTLC, treating as success: Failed to update ChannelMonitor".to_string(), 1);
 
        *nodes[1].chan_monitor.update_ret.lock().unwrap() = Ok(());
        let (outpoint, latest_update) = nodes[1].chan_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
index d1395e1271c3d5bc130d53c2cb29e561d4dc7f16..918f8b28f37304c319e065f4b89fda05f3b0de98 100644 (file)
@@ -1850,6 +1850,9 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
        /// privacy-breaking recipient-probing attacks which may reveal payment activity to
        /// motivated attackers.
        ///
+       /// Note that the privacy concerns in (b) are not relevant in payments with a payment_secret
+       /// set. Thus, for such payments we will claim any payments which do not under-pay.
+       ///
        /// May panic if called except in response to a PaymentReceived event.
        pub fn claim_funds(&self, payment_preimage: PaymentPreimage, payment_secret: &Option<PaymentSecret>, expected_amount: u64) -> bool {
                let payment_hash = PaymentHash(Sha256::hash(&payment_preimage.0).into_inner());
@@ -1860,18 +1863,39 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
                let removed_source = channel_state.as_mut().unwrap().claimable_htlcs.remove(&(payment_hash, *payment_secret));
                if let Some(mut sources) = removed_source {
                        assert!(!sources.is_empty());
-                       let valid_mpp_amount = if let &Some(ref data) = &sources[0].payment_data {
+
+                       // If we are claiming an MPP payment, we have to take special care to ensure that each
+                       // channel exists before claiming all of the payments (inside one lock).
+                       // Note that channel existance is sufficient as we should always get a monitor update
+                       // which will take care of the real HTLC claim enforcement.
+                       //
+                       // If we find an HTLC which we would need to claim but for which we do not have a
+                       // channel, we will fail all parts of the MPP payment. While we could wait and see if
+                       // the sender retries the already-failed path(s), it should be a pretty rare case where
+                       // we got all the HTLCs and then a channel closed while we were waiting for the user to
+                       // provide the preimage, so worrying too much about the optimal handling isn't worth
+                       // it.
+
+                       let (is_mpp, mut valid_mpp) = if let &Some(ref data) = &sources[0].payment_data {
                                assert!(payment_secret.is_some());
-                               data.total_msat == expected_amount
+                               (true, data.total_msat >= expected_amount)
                        } else {
                                assert!(payment_secret.is_none());
-                               false
+                               (false, false)
                        };
 
+                       for htlc in sources.iter() {
+                               if !is_mpp || !valid_mpp { break; }
+                               if let None = channel_state.as_ref().unwrap().short_to_id.get(&htlc.prev_hop.short_channel_id) {
+                                       valid_mpp = false;
+                               }
+                       }
+
+                       let mut errs = Vec::new();
                        let mut claimed_any_htlcs = false;
                        for htlc in sources.drain(..) {
                                if channel_state.is_none() { channel_state = Some(self.channel_state.lock().unwrap()); }
-                               if !valid_mpp_amount && (htlc.value < expected_amount || htlc.value > expected_amount * 2) {
+                               if (is_mpp && !valid_mpp) || (!is_mpp && (htlc.value < expected_amount || htlc.value > expected_amount * 2)) {
                                        let mut htlc_msat_data = byte_utils::be64_to_array(htlc.value).to_vec();
                                        let mut height_data = byte_utils::be32_to_array(self.latest_block_height.load(Ordering::Acquire) as u32).to_vec();
                                        htlc_msat_data.append(&mut height_data);
@@ -1879,79 +1903,116 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
                                                                         HTLCSource::PreviousHopData(htlc.prev_hop), &payment_hash,
                                                                         HTLCFailReason::Reason { failure_code: 0x4000|15, data: htlc_msat_data });
                                } else {
-                                       self.claim_funds_internal(channel_state.take().unwrap(), HTLCSource::PreviousHopData(htlc.prev_hop), payment_preimage);
-                                       claimed_any_htlcs = true;
+                                       match self.claim_funds_from_hop(channel_state.as_mut().unwrap(), htlc.prev_hop, payment_preimage) {
+                                               Err(Some(e)) => {
+                                                       if let msgs::ErrorAction::IgnoreError = e.1.err.action {
+                                                               // We got a temporary failure updating monitor, but will claim the
+                                                               // HTLC when the monitor updating is restored (or on chain).
+                                                               log_error!(self, "Temporary failure claiming HTLC, treating as success: {}", e.1.err.err);
+                                                               claimed_any_htlcs = true;
+                                                       } else { errs.push(e); }
+                                               },
+                                               Err(None) if is_mpp => unreachable!("We already checked for channel existence, we can't fail here!"),
+                                               Err(None) => {
+                                                       log_warn!(self, "Channel we expected to claim an HTLC from was closed.");
+                                               },
+                                               Ok(()) => claimed_any_htlcs = true,
+                                       }
                                }
                        }
+
+                       // Now that we've done the entire above loop in one lock, we can handle any errors
+                       // which were generated.
+                       channel_state.take();
+
+                       for (their_node_id, err) in errs.drain(..) {
+                               let res: Result<(), _> = Err(err);
+                               let _ = handle_error!(self, res, their_node_id);
+                       }
+
                        claimed_any_htlcs
                } else { false }
        }
-       fn claim_funds_internal(&self, mut channel_state_lock: MutexGuard<ChannelHolder<ChanSigner>>, source: HTLCSource, payment_preimage: PaymentPreimage) {
-               let (their_node_id, err) = loop {
-                       match source {
-                               HTLCSource::OutboundRoute { .. } => {
-                                       mem::drop(channel_state_lock);
-                                       let mut pending_events = self.pending_events.lock().unwrap();
-                                       pending_events.push(events::Event::PaymentSent {
-                                               payment_preimage
-                                       });
-                               },
-                               HTLCSource::PreviousHopData(HTLCPreviousHopData { short_channel_id, htlc_id, .. }) => {
-                                       //TODO: Delay the claimed_funds relaying just like we do outbound relay!
-                                       let channel_state = &mut *channel_state_lock;
 
-                                       let chan_id = match channel_state.short_to_id.get(&short_channel_id) {
-                                               Some(chan_id) => chan_id.clone(),
-                                               None => {
-                                                       // TODO: There is probably a channel manager somewhere that needs to
-                                                       // learn the preimage as the channel already hit the chain and that's
-                                                       // why it's missing.
-                                                       return
-                                               }
-                                       };
+       fn claim_funds_from_hop(&self, channel_state_lock: &mut MutexGuard<ChannelHolder<ChanSigner>>, prev_hop: HTLCPreviousHopData, payment_preimage: PaymentPreimage) -> Result<(), Option<(PublicKey, MsgHandleErrInternal)>> {
+               //TODO: Delay the claimed_funds relaying just like we do outbound relay!
+               let channel_state = &mut **channel_state_lock;
+               let chan_id = match channel_state.short_to_id.get(&prev_hop.short_channel_id) {
+                       Some(chan_id) => chan_id.clone(),
+                       None => {
+                               return Err(None)
+                       }
+               };
 
-                                       if let hash_map::Entry::Occupied(mut chan) = channel_state.by_id.entry(chan_id) {
-                                               let was_frozen_for_monitor = chan.get().is_awaiting_monitor_update();
-                                               match chan.get_mut().get_update_fulfill_htlc_and_commit(htlc_id, payment_preimage) {
-                                                       Ok((msgs, monitor_option)) => {
-                                                               if let Some(monitor_update) = monitor_option {
-                                                                       if let Err(e) = self.monitor.update_monitor(chan.get().get_funding_txo().unwrap(), monitor_update) {
-                                                                               if was_frozen_for_monitor {
-                                                                                       assert!(msgs.is_none());
-                                                                               } else {
-                                                                                       break (chan.get().get_their_node_id(), handle_monitor_err!(self, e, channel_state, chan, RAACommitmentOrder::CommitmentFirst, false, msgs.is_some()));
-                                                                               }
-                                                                       }
-                                                               }
-                                                               if let Some((msg, commitment_signed)) = msgs {
-                                                                       channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
-                                                                               node_id: chan.get().get_their_node_id(),
-                                                                               updates: msgs::CommitmentUpdate {
-                                                                                       update_add_htlcs: Vec::new(),
-                                                                                       update_fulfill_htlcs: vec![msg],
-                                                                                       update_fail_htlcs: Vec::new(),
-                                                                                       update_fail_malformed_htlcs: Vec::new(),
-                                                                                       update_fee: None,
-                                                                                       commitment_signed,
-                                                                               }
-                                                                       });
-                                                               }
-                                                       },
-                                                       Err(_e) => {
-                                                               // TODO: There is probably a channel manager somewhere that needs to
-                                                               // learn the preimage as the channel may be about to hit the chain.
-                                                               //TODO: Do something with e?
-                                                               return
-                                                       },
+               if let hash_map::Entry::Occupied(mut chan) = channel_state.by_id.entry(chan_id) {
+                       let was_frozen_for_monitor = chan.get().is_awaiting_monitor_update();
+                       match chan.get_mut().get_update_fulfill_htlc_and_commit(prev_hop.htlc_id, payment_preimage) {
+                               Ok((msgs, monitor_option)) => {
+                                       if let Some(monitor_update) = monitor_option {
+                                               if let Err(e) = self.monitor.update_monitor(chan.get().get_funding_txo().unwrap(), monitor_update) {
+                                                       if was_frozen_for_monitor {
+                                                               assert!(msgs.is_none());
+                                                       } else {
+                                                               return Err(Some((chan.get().get_their_node_id(), handle_monitor_err!(self, e, channel_state, chan, RAACommitmentOrder::CommitmentFirst, false, msgs.is_some()).unwrap_err())));
+                                                       }
                                                }
-                                       } else { unreachable!(); }
+                                       }
+                                       if let Some((msg, commitment_signed)) = msgs {
+                                               channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
+                                                       node_id: chan.get().get_their_node_id(),
+                                                       updates: msgs::CommitmentUpdate {
+                                                               update_add_htlcs: Vec::new(),
+                                                               update_fulfill_htlcs: vec![msg],
+                                                               update_fail_htlcs: Vec::new(),
+                                                               update_fail_malformed_htlcs: Vec::new(),
+                                                               update_fee: None,
+                                                               commitment_signed,
+                                                       }
+                                               });
+                                       }
+                                       return Ok(())
+                               },
+                               Err(e) => {
+                                       // TODO: Do something with e?
+                                       // This should only occur if we are claiming an HTLC at the same time as the
+                                       // HTLC is being failed (eg because a block is being connected and this caused
+                                       // an HTLC to time out). This should, of course, only occur if the user is the
+                                       // one doing the claiming (as it being a part of a peer claim would imply we're
+                                       // about to lose funds) and only if the lock in claim_funds was dropped as a
+                                       // previous HTLC was failed (thus not for an MPP payment).
+                                       debug_assert!(false, "This shouldn't be reachable except in absurdly rare cases between monitor updates and HTLC timeouts: {:?}", e);
+                                       return Err(None)
                                },
                        }
-                       return;
-               };
+               } else { unreachable!(); }
+       }
 
-               mem::drop(channel_state_lock);
-               let _ = handle_error!(self, err, their_node_id);
+       fn claim_funds_internal(&self, mut channel_state_lock: MutexGuard<ChannelHolder<ChanSigner>>, source: HTLCSource, payment_preimage: PaymentPreimage) {
+               match source {
+                       HTLCSource::OutboundRoute { .. } => {
+                               mem::drop(channel_state_lock);
+                               let mut pending_events = self.pending_events.lock().unwrap();
+                               pending_events.push(events::Event::PaymentSent {
+                                       payment_preimage
+                               });
+                       },
+                       HTLCSource::PreviousHopData(hop_data) => {
+                               if let Err((their_node_id, err)) = match self.claim_funds_from_hop(&mut channel_state_lock, hop_data, payment_preimage) {
+                                       Ok(()) => Ok(()),
+                                       Err(None) => {
+                                               // TODO: There is probably a channel monitor somewhere that needs to
+                                               // learn the preimage as the channel already hit the chain and that's
+                                               // why it's missing.
+                                               Ok(())
+                                       },
+                                       Err(Some(res)) => Err(res),
+                               } {
+                                       mem::drop(channel_state_lock);
+                                       let res: Result<(), _> = Err(err);
+                                       let _ = handle_error!(self, res, their_node_id);
+                               }
+                       },
+               }
        }
 
        /// Gets the node_id held by this ChannelManager