Handle claim result event generation in claim_funds_from_hop
authorMatt Corallo <git@bluematt.me>
Tue, 6 Dec 2022 21:01:50 +0000 (21:01 +0000)
committerMatt Corallo <git@bluematt.me>
Thu, 8 Dec 2022 21:24:26 +0000 (21:24 +0000)
Currently `claim_funds` and `claim_funds_internal` call
`claim_funds_from_hop` and then surface and `Event` to the user
informing them of the forwarded/claimed payment based on it's
result. In both places we assume that a claim "completed" even if
a monitor update is being done async.

Instead, here we push that event generation through a
`MonitorUpdateCompletionAction` and a call to
`handle_monitor_update_completion_action`. This will allow us to
hold the event(s) until async monitor updates complete in the
future.

lightning/src/ln/channelmanager.rs

index b0742ab051b02e817506c6b2d18e8704bd6bc23e..ecc30fd12b5fb2dbc6a3435e66ad56e322c6f3c6 100644 (file)
@@ -4300,7 +4300,6 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
                let mut expected_amt_msat = None;
                let mut valid_mpp = true;
                let mut errs = Vec::new();
-               let mut claimed_any_htlcs = false;
                let mut channel_state = Some(self.channel_state.lock().unwrap());
                for htlc in sources.iter() {
                        let chan_id = match self.short_to_chan_info.read().unwrap().get(&htlc.prev_hop.short_channel_id) {
@@ -4352,13 +4351,14 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
                if valid_mpp {
                        for htlc in sources.drain(..) {
                                if channel_state.is_none() { channel_state = Some(self.channel_state.lock().unwrap()); }
-                               match self.claim_funds_from_hop(channel_state.take().unwrap(), htlc.prev_hop, payment_preimage) {
+                               match self.claim_funds_from_hop(channel_state.take().unwrap(), htlc.prev_hop, payment_preimage,
+                                       |_| Some(MonitorUpdateCompletionAction::PaymentClaimed { payment_hash }))
+                               {
                                        ClaimFundsFromHop::MonitorUpdateFail(pk, err, _) => {
                                                if let msgs::ErrorAction::IgnoreError = err.err.action {
                                                        // We got a temporary failure updating monitor, but will claim the
                                                        // HTLC when the monitor updating is restored (or on chain).
                                                        log_error!(self.logger, "Temporary failure claiming HTLC, treating as success: {}", err.err.err);
-                                                       claimed_any_htlcs = true;
                                                } else { errs.push((pk, err)); }
                                        },
                                        ClaimFundsFromHop::PrevHopForceClosed => {
@@ -4373,7 +4373,7 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
                                                // available to be claimed. Thus, it does not make sense to set
                                                // `claimed_any_htlcs`.
                                        },
-                                       ClaimFundsFromHop::Success(_) => claimed_any_htlcs = true,
+                                       ClaimFundsFromHop::Success(_) => {},
                                }
                        }
                }
@@ -4387,14 +4387,7 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
                                let receiver = HTLCDestination::FailedPayment { payment_hash };
                                self.fail_htlc_backwards_internal(&source, &payment_hash, &reason, receiver);
                        }
-               }
-
-               let ClaimingPayment { amount_msat, payment_purpose: purpose, receiver_node_id } =
-                       self.claimable_payments.lock().unwrap().pending_claiming_payments.remove(&payment_hash).unwrap();
-               if claimed_any_htlcs {
-                       self.pending_events.lock().unwrap().push(events::Event::PaymentClaimed {
-                               payment_hash, purpose, amount_msat, receiver_node_id: Some(receiver_node_id),
-                       });
+                       self.claimable_payments.lock().unwrap().pending_claiming_payments.remove(&payment_hash);
                }
 
                // Now we can handle any errors which were generated.
@@ -4404,12 +4397,16 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
                }
        }
 
-       fn claim_funds_from_hop(&self, mut channel_state_lock: MutexGuard<ChannelHolder<<K::Target as KeysInterface>::Signer>>, prev_hop: HTLCPreviousHopData, payment_preimage: PaymentPreimage) -> ClaimFundsFromHop {
+       fn claim_funds_from_hop<ComplFunc: FnOnce(Option<u64>) -> Option<MonitorUpdateCompletionAction>>(&self,
+               mut channel_state_lock: MutexGuard<ChannelHolder<<K::Target as KeysInterface>::Signer>>,
+               prev_hop: HTLCPreviousHopData, payment_preimage: PaymentPreimage, completion_action: ComplFunc)
+       -> ClaimFundsFromHop {
                //TODO: Delay the claimed_funds relaying just like we do outbound relay!
 
                let chan_id = prev_hop.outpoint.to_channel_id();
                let channel_state = &mut *channel_state_lock;
                if let hash_map::Entry::Occupied(mut chan) = channel_state.by_id.entry(chan_id) {
+                       let counterparty_node_id = chan.get().get_counterparty_node_id();
                        match chan.get_mut().get_update_fulfill_htlc_and_commit(prev_hop.htlc_id, payment_preimage, &self.logger) {
                                Ok(msgs_monitor_option) => {
                                        if let UpdateFulfillCommitFetch::NewClaim { msgs, htlc_value_msat, monitor_update } = msgs_monitor_option {
@@ -4419,10 +4416,11 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
                                                                log_given_level!(self.logger, if e == ChannelMonitorUpdateStatus::PermanentFailure { Level::Error } else { Level::Debug },
                                                                        "Failed to update channel monitor with preimage {:?}: {:?}",
                                                                        payment_preimage, e);
+                                                               let err = handle_monitor_update_res!(self, e, chan, RAACommitmentOrder::CommitmentFirst, false, msgs.is_some()).unwrap_err();
+                                                               mem::drop(channel_state_lock);
+                                                               self.handle_monitor_update_completion_actions(completion_action(Some(htlc_value_msat)));
                                                                return ClaimFundsFromHop::MonitorUpdateFail(
-                                                                       chan.get().get_counterparty_node_id(),
-                                                                       handle_monitor_update_res!(self, e, chan, RAACommitmentOrder::CommitmentFirst, false, msgs.is_some()).unwrap_err(),
-                                                                       Some(htlc_value_msat)
+                                                                       counterparty_node_id, err, Some(htlc_value_msat)
                                                                );
                                                        }
                                                }
@@ -4441,6 +4439,8 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
                                                                }
                                                        });
                                                }
+                                               mem::drop(channel_state_lock);
+                                               self.handle_monitor_update_completion_actions(completion_action(Some(htlc_value_msat)));
                                                return ClaimFundsFromHop::Success(htlc_value_msat);
                                        } else {
                                                return ClaimFundsFromHop::DuplicateClaim;
@@ -4455,11 +4455,12 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
                                                                payment_preimage, e);
                                                },
                                        }
-                                       let counterparty_node_id = chan.get().get_counterparty_node_id();
                                        let (drop, res) = convert_chan_err!(self, e, chan.get_mut(), &chan_id);
                                        if drop {
                                                chan.remove_entry();
                                        }
+                                       mem::drop(channel_state_lock);
+                                       self.handle_monitor_update_completion_actions(completion_action(None));
                                        return ClaimFundsFromHop::MonitorUpdateFail(counterparty_node_id, res, None);
                                },
                        }
@@ -4481,6 +4482,13 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
                                log_error!(self.logger, "Critical error: failed to update channel monitor with preimage {:?}: {:?}",
                                        payment_preimage, update_res);
                        }
+                       mem::drop(channel_state_lock);
+                       // Note that we do process the completion action here. This totally could be a
+                       // duplicate claim, but we have no way of knowing without interrogating the
+                       // `ChannelMonitor` we've provided the above update to. Instead, note that `Event`s are
+                       // generally always allowed to be duplicative (and it's specifically noted in
+                       // `PaymentForwarded`).
+                       self.handle_monitor_update_completion_actions(completion_action(None));
                        return ClaimFundsFromHop::PrevHopForceClosed
                }
        }
@@ -4555,43 +4563,28 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
                        },
                        HTLCSource::PreviousHopData(hop_data) => {
                                let prev_outpoint = hop_data.outpoint;
-                               let res = self.claim_funds_from_hop(channel_state_lock, hop_data, payment_preimage);
-                               let claimed_htlc = if let ClaimFundsFromHop::DuplicateClaim = res { false } else { true };
-                               let htlc_claim_value_msat = match res {
-                                       ClaimFundsFromHop::MonitorUpdateFail(_, _, amt_opt) => amt_opt,
-                                       ClaimFundsFromHop::Success(amt) => Some(amt),
-                                       _ => None,
-                               };
-                               if let ClaimFundsFromHop::PrevHopForceClosed = res {
-                                       // Note that we do *not* set `claimed_htlc` to false here. In fact, this
-                                       // totally could be a duplicate claim, but we have no way of knowing
-                                       // without interrogating the `ChannelMonitor` we've provided the above
-                                       // update to. Instead, we simply document in `PaymentForwarded` that this
-                                       // can happen.
-                               }
+                               let res = self.claim_funds_from_hop(channel_state_lock, hop_data, payment_preimage,
+                                       |htlc_claim_value_msat| {
+                                               if let Some(forwarded_htlc_value) = forwarded_htlc_value_msat {
+                                                       let fee_earned_msat = if let Some(claimed_htlc_value) = htlc_claim_value_msat {
+                                                               Some(claimed_htlc_value - forwarded_htlc_value)
+                                                       } else { None };
+
+                                                       let prev_channel_id = Some(prev_outpoint.to_channel_id());
+                                                       let next_channel_id = Some(next_channel_id);
+
+                                                       Some(MonitorUpdateCompletionAction::EmitEvent { event: events::Event::PaymentForwarded {
+                                                               fee_earned_msat,
+                                                               claim_from_onchain_tx: from_onchain,
+                                                               prev_channel_id,
+                                                               next_channel_id,
+                                                       }})
+                                               } else { None }
+                                       });
                                if let ClaimFundsFromHop::MonitorUpdateFail(pk, err, _) = res {
                                        let result: Result<(), _> = Err(err);
                                        let _ = handle_error!(self, result, pk);
                                }
-
-                               if claimed_htlc {
-                                       if let Some(forwarded_htlc_value) = forwarded_htlc_value_msat {
-                                               let fee_earned_msat = if let Some(claimed_htlc_value) = htlc_claim_value_msat {
-                                                       Some(claimed_htlc_value - forwarded_htlc_value)
-                                               } else { None };
-
-                                               let mut pending_events = self.pending_events.lock().unwrap();
-                                               let prev_channel_id = Some(prev_outpoint.to_channel_id());
-                                               let next_channel_id = Some(next_channel_id);
-
-                                               pending_events.push(events::Event::PaymentForwarded {
-                                                       fee_earned_msat,
-                                                       claim_from_onchain_tx: from_onchain,
-                                                       prev_channel_id,
-                                                       next_channel_id,
-                                               });
-                                       }
-                               }
                        },
                }
        }