Inform ChannelManager when fulfilled HTLCs are finalized
[rust-lightning] / lightning / src / ln / channelmanager.rs
index 5b29828ebaad09042dd65923512569ce47a31ef8..019d61c57c09a2d3e8f0606a3353a49198cc5425 100644 (file)
@@ -1002,7 +1002,7 @@ macro_rules! handle_monitor_err {
        ($self: ident, $err: expr, $channel_state: expr, $entry: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr) => {
                handle_monitor_err!($self, $err, $channel_state, $entry, $action_type, $resend_raa, $resend_commitment, Vec::new(), Vec::new())
        };
-       ($self: ident, $err: expr, $short_to_id: expr, $chan: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr, $failed_forwards: expr, $failed_fails: expr, $chan_id: expr) => {
+       ($self: ident, $err: expr, $short_to_id: expr, $chan: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr, $failed_forwards: expr, $failed_fails: expr, $failed_finalized_fulfills: expr, $chan_id: expr) => {
                match $err {
                        ChannelMonitorUpdateErr::PermanentFailure => {
                                log_error!($self.logger, "Closing channel {} due to monitor update ChannelMonitorUpdateErr::PermanentFailure", log_bytes!($chan_id[..]));
@@ -1023,7 +1023,7 @@ macro_rules! handle_monitor_err {
                                (res, true)
                        },
                        ChannelMonitorUpdateErr::TemporaryFailure => {
-                               log_info!($self.logger, "Disabling channel {} due to monitor update TemporaryFailure. On restore will send {} and process {} forwards and {} fails",
+                               log_info!($self.logger, "Disabling channel {} due to monitor update TemporaryFailure. On restore will send {} and process {} forwards, {} fails, and {} fulfill finalizations",
                                                log_bytes!($chan_id[..]),
                                                if $resend_commitment && $resend_raa {
                                                                match $action_type {
@@ -1034,25 +1034,29 @@ macro_rules! handle_monitor_err {
                                                        else if $resend_raa { "RAA" }
                                                        else { "nothing" },
                                                (&$failed_forwards as &Vec<(PendingHTLCInfo, u64)>).len(),
-                                               (&$failed_fails as &Vec<(HTLCSource, PaymentHash, HTLCFailReason)>).len());
+                                               (&$failed_fails as &Vec<(HTLCSource, PaymentHash, HTLCFailReason)>).len(),
+                                               (&$failed_finalized_fulfills as &Vec<HTLCSource>).len());
                                if !$resend_commitment {
                                        debug_assert!($action_type == RAACommitmentOrder::RevokeAndACKFirst || !$resend_raa);
                                }
                                if !$resend_raa {
                                        debug_assert!($action_type == RAACommitmentOrder::CommitmentFirst || !$resend_commitment);
                                }
-                               $chan.monitor_update_failed($resend_raa, $resend_commitment, $failed_forwards, $failed_fails);
+                               $chan.monitor_update_failed($resend_raa, $resend_commitment, $failed_forwards, $failed_fails, $failed_finalized_fulfills);
                                (Err(MsgHandleErrInternal::from_chan_no_close(ChannelError::Ignore("Failed to update ChannelMonitor".to_owned()), *$chan_id)), false)
                        },
                }
        };
-       ($self: ident, $err: expr, $channel_state: expr, $entry: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr, $failed_forwards: expr, $failed_fails: expr) => { {
-               let (res, drop) = handle_monitor_err!($self, $err, $channel_state.short_to_id, $entry.get_mut(), $action_type, $resend_raa, $resend_commitment, $failed_forwards, $failed_fails, $entry.key());
+       ($self: ident, $err: expr, $channel_state: expr, $entry: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr, $failed_forwards: expr, $failed_fails: expr, $failed_finalized_fulfills: expr) => { {
+               let (res, drop) = handle_monitor_err!($self, $err, $channel_state.short_to_id, $entry.get_mut(), $action_type, $resend_raa, $resend_commitment, $failed_forwards, $failed_fails, $failed_finalized_fulfills, $entry.key());
                if drop {
                        $entry.remove_entry();
                }
                res
        } };
+       ($self: ident, $err: expr, $channel_state: expr, $entry: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr, $failed_forwards: expr, $failed_fails: expr) => {
+               handle_monitor_err!($self, $err, $channel_state, $entry, $action_type, $resend_raa, $resend_commitment, $failed_forwards, $failed_fails, Vec::new());
+       }
 }
 
 macro_rules! return_monitor_err {
@@ -1441,7 +1445,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                        if let Some(monitor_update) = monitor_update {
                                                if let Err(e) = self.chain_monitor.update_channel(chan_entry.get().get_funding_txo().unwrap(), monitor_update) {
                                                        let (result, is_permanent) =
-                                                               handle_monitor_err!(self, e, channel_state.short_to_id, chan_entry.get_mut(), RAACommitmentOrder::CommitmentFirst, false, false, Vec::new(), Vec::new(), chan_entry.key());
+                                                               handle_monitor_err!(self, e, channel_state.short_to_id, chan_entry.get_mut(), RAACommitmentOrder::CommitmentFirst, false, false, Vec::new(), Vec::new(), Vec::new(), chan_entry.key());
                                                        if is_permanent {
                                                                remove_channel!(channel_state, chan_entry);
                                                                break result;
@@ -2846,7 +2850,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                let ret_err = match res {
                        Ok(Some((update_fee, commitment_signed, monitor_update))) => {
                                if let Err(e) = self.chain_monitor.update_channel(chan.get_funding_txo().unwrap(), monitor_update) {
-                                       let (res, drop) = handle_monitor_err!(self, e, short_to_id, chan, RAACommitmentOrder::CommitmentFirst, false, true, Vec::new(), Vec::new(), chan_id);
+                                       let (res, drop) = handle_monitor_err!(self, e, short_to_id, chan, RAACommitmentOrder::CommitmentFirst, false, true, Vec::new(), Vec::new(), Vec::new(), chan_id);
                                        if drop { retain_channel = false; }
                                        res
                                } else {
@@ -3404,27 +3408,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                self.our_network_pubkey.clone()
        }
 
-       /// Restores a single, given channel to normal operation after a
-       /// ChannelMonitorUpdateErr::TemporaryFailure was returned from a channel monitor update
-       /// operation.
-       ///
-       /// All ChannelMonitor updates up to and including highest_applied_update_id must have been
-       /// fully committed in every copy of the given channels' ChannelMonitors.
-       ///
-       /// Note that there is no effect to calling with a highest_applied_update_id other than the
-       /// current latest ChannelMonitorUpdate and one call to this function after multiple
-       /// ChannelMonitorUpdateErr::TemporaryFailures is fine. The highest_applied_update_id field
-       /// exists largely only to prevent races between this and concurrent update_monitor calls.
-       ///
-       /// Thus, the anticipated use is, at a high level:
-       ///  1) You register a chain::Watch with this ChannelManager,
-       ///  2) it stores each update to disk, and begins updating any remote (eg watchtower) copies of
-       ///     said ChannelMonitors as it can, returning ChannelMonitorUpdateErr::TemporaryFailures
-       ///     any time it cannot do so instantly,
-       ///  3) update(s) are applied to each remote copy of a ChannelMonitor,
-       ///  4) once all remote copies are updated, you call this function with the update_id that
-       ///     completed, and once it is the latest the Channel will be re-enabled.
-       pub fn channel_monitor_updated(&self, funding_txo: &OutPoint, highest_applied_update_id: u64) {
+       fn channel_monitor_updated(&self, funding_txo: &OutPoint, highest_applied_update_id: u64) {
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
 
                let chan_restoration_res;
@@ -3439,8 +3423,8 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                return;
                        }
 
-                       let (raa, commitment_update, order, pending_forwards, pending_failures, funding_broadcastable, funding_locked) = channel.get_mut().monitor_updating_restored(&self.logger);
-                       let channel_update = if funding_locked.is_some() && channel.get().is_usable() && !channel.get().should_announce() {
+                       let updates = channel.get_mut().monitor_updating_restored(&self.logger);
+                       let channel_update = if updates.funding_locked.is_some() && channel.get().is_usable() && !channel.get().should_announce() {
                                // We only send a channel_update in the case where we are just now sending a
                                // funding_locked and the channel is in a usable state. Further, we rely on the
                                // normal announcement_signatures process to send a channel_update for public
@@ -3450,11 +3434,12 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                        msg: self.get_channel_update_for_unicast(channel.get()).unwrap(),
                                })
                        } else { None };
-                       chan_restoration_res = handle_chan_restoration_locked!(self, channel_lock, channel_state, channel, raa, commitment_update, order, None, pending_forwards, funding_broadcastable, funding_locked);
+                       // TODO: Handle updates.finalized_claimed_htlcs!
+                       chan_restoration_res = handle_chan_restoration_locked!(self, channel_lock, channel_state, channel, updates.raa, updates.commitment_update, updates.order, None, updates.accepted_htlcs, updates.funding_broadcastable, updates.funding_locked);
                        if let Some(upd) = channel_update {
                                channel_state.pending_msg_events.push(upd);
                        }
-                       pending_failures
+                       updates.failed_htlcs
                };
                post_handle_chan_restoration!(self, chan_restoration_res);
                for failure in pending_failures.drain(..) {
@@ -3545,7 +3530,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                        // hasn't persisted to disk yet - we can't lose money on a transaction that we haven't
                                        // accepted payment from yet. We do, however, need to wait to send our funding_locked
                                        // until we have persisted our monitor.
-                                       chan.monitor_update_failed(false, false, Vec::new(), Vec::new());
+                                       chan.monitor_update_failed(false, false, Vec::new(), Vec::new(), Vec::new());
                                },
                        }
                }
@@ -3663,7 +3648,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                        if let Some(monitor_update) = monitor_update {
                                                if let Err(e) = self.chain_monitor.update_channel(chan_entry.get().get_funding_txo().unwrap(), monitor_update) {
                                                        let (result, is_permanent) =
-                                                               handle_monitor_err!(self, e, channel_state.short_to_id, chan_entry.get_mut(), RAACommitmentOrder::CommitmentFirst, false, false, Vec::new(), Vec::new(), chan_entry.key());
+                                                               handle_monitor_err!(self, e, channel_state.short_to_id, chan_entry.get_mut(), RAACommitmentOrder::CommitmentFirst, false, false, Vec::new(), Vec::new(), Vec::new(), chan_entry.key());
                                                        if is_permanent {
                                                                remove_channel!(channel_state, chan_entry);
                                                                break result;
@@ -3950,26 +3935,36 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                                break Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id));
                                        }
                                        let was_frozen_for_monitor = chan.get().is_awaiting_monitor_update();
-                                       let (commitment_update, pending_forwards, pending_failures, monitor_update, htlcs_to_fail_in) =
-                                               break_chan_entry!(self, chan.get_mut().revoke_and_ack(&msg, &self.logger), channel_state, chan);
-                                       htlcs_to_fail = htlcs_to_fail_in;
-                                       if let Err(e) = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), monitor_update) {
+                                       let raa_updates = break_chan_entry!(self,
+                                               chan.get_mut().revoke_and_ack(&msg, &self.logger), channel_state, chan);
+                                       htlcs_to_fail = raa_updates.holding_cell_failed_htlcs;
+                                       if let Err(e) = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), raa_updates.monitor_update) {
                                                if was_frozen_for_monitor {
-                                                       assert!(commitment_update.is_none() && pending_forwards.is_empty() && pending_failures.is_empty());
+                                                       assert!(raa_updates.commitment_update.is_none());
+                                                       assert!(raa_updates.accepted_htlcs.is_empty());
+                                                       assert!(raa_updates.failed_htlcs.is_empty());
+                                                       assert!(raa_updates.finalized_claimed_htlcs.is_empty());
                                                        break Err(MsgHandleErrInternal::ignore_no_close("Previous monitor update failure prevented responses to RAA".to_owned()));
                                                } else {
-                                                       if let Err(e) = handle_monitor_err!(self, e, channel_state, chan, RAACommitmentOrder::CommitmentFirst, false, commitment_update.is_some(), pending_forwards, pending_failures) {
+                                                       if let Err(e) = handle_monitor_err!(self, e, channel_state, chan,
+                                                                       RAACommitmentOrder::CommitmentFirst, false,
+                                                                       raa_updates.commitment_update.is_some(),
+                                                                       raa_updates.accepted_htlcs, raa_updates.failed_htlcs,
+                                                                       raa_updates.finalized_claimed_htlcs) {
                                                                break Err(e);
                                                        } else { unreachable!(); }
                                                }
                                        }
-                                       if let Some(updates) = commitment_update {
+                                       if let Some(updates) = raa_updates.commitment_update {
                                                channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
                                                        node_id: counterparty_node_id.clone(),
                                                        updates,
                                                });
                                        }
-                                       break Ok((pending_forwards, pending_failures, chan.get().get_short_channel_id().expect("RAA should only work on a short-id-available channel"), chan.get().get_funding_txo().unwrap()))
+                                       break Ok((raa_updates.accepted_htlcs, raa_updates.failed_htlcs,
+                                                       chan.get().get_short_channel_id()
+                                                               .expect("RAA should only work on a short-id-available channel"),
+                                                       chan.get().get_funding_txo().unwrap()))
                                },
                                hash_map::Entry::Vacant(_) => break Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id))
                        }
@@ -4129,7 +4124,8 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                                self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_update.source, &htlc_update.payment_hash, HTLCFailReason::Reason { failure_code: 0x4000 | 8, data: Vec::new() });
                                        }
                                },
-                               MonitorEvent::CommitmentTxConfirmed(funding_outpoint) => {
+                               MonitorEvent::CommitmentTxConfirmed(funding_outpoint) |
+                               MonitorEvent::UpdateFailed(funding_outpoint) => {
                                        let mut channel_lock = self.channel_state.lock().unwrap();
                                        let channel_state = &mut *channel_lock;
                                        let by_id = &mut channel_state.by_id;
@@ -4145,7 +4141,12 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                                                msg: update
                                                        });
                                                }
-                                               self.issue_channel_close_events(&chan, ClosureReason::CommitmentTxConfirmed);
+                                               let reason = if let MonitorEvent::UpdateFailed(_) = monitor_event {
+                                                       ClosureReason::ProcessingError { err: "Failed to persist ChannelMonitor update during chain sync".to_string() }
+                                               } else {
+                                                       ClosureReason::CommitmentTxConfirmed
+                                               };
+                                               self.issue_channel_close_events(&chan, reason);
                                                pending_msg_events.push(events::MessageSendEvent::HandleError {
                                                        node_id: chan.get_counterparty_node_id(),
                                                        action: msgs::ErrorAction::SendErrorMessage {
@@ -4154,6 +4155,9 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                                });
                                        }
                                },
+                               MonitorEvent::UpdateCompleted { funding_txo, monitor_update_id } => {
+                                       self.channel_monitor_updated(&funding_txo, monitor_update_id);
+                               },
                        }
                }
 
@@ -4164,6 +4168,14 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                has_pending_monitor_events
        }
 
+       /// In chanmon_consistency_target, we'd like to be able to restore monitor updating without
+       /// handling all pending events (i.e. not PendingHTLCsForwardable). Thus, we expose monitor
+       /// update events as a separate process method here.
+       #[cfg(feature = "fuzztarget")]
+       pub fn process_monitor_events(&self) {
+               self.process_pending_monitor_events();
+       }
+
        /// Check the holding cell in each channel and free any pending HTLCs in them if possible.
        /// Returns whether there were any updates such as if pending HTLCs were freed or a monitor
        /// update was applied.
@@ -4192,7 +4204,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                                if let Some((commitment_update, monitor_update)) = commitment_opt {
                                                        if let Err(e) = self.chain_monitor.update_channel(chan.get_funding_txo().unwrap(), monitor_update) {
                                                                has_monitor_update = true;
-                                                               let (res, close_channel) = handle_monitor_err!(self, e, short_to_id, chan, RAACommitmentOrder::CommitmentFirst, false, true, Vec::new(), Vec::new(), channel_id);
+                                                               let (res, close_channel) = handle_monitor_err!(self, e, short_to_id, chan, RAACommitmentOrder::CommitmentFirst, false, true, Vec::new(), Vec::new(), Vec::new(), channel_id);
                                                                handle_errors.push((chan.get_counterparty_node_id(), res));
                                                                if close_channel { return false; }
                                                        } else {
@@ -5468,20 +5480,25 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> Writeable f
 ///
 /// At a high-level, the process for deserializing a ChannelManager and resuming normal operation
 /// is:
-/// 1) Deserialize all stored ChannelMonitors.
-/// 2) Deserialize the ChannelManager by filling in this struct and calling:
-///    <(BlockHash, ChannelManager)>::read(reader, args)
-///    This may result in closing some Channels if the ChannelMonitor is newer than the stored
-///    ChannelManager state to ensure no loss of funds. Thus, transactions may be broadcasted.
-/// 3) If you are not fetching full blocks, register all relevant ChannelMonitor outpoints the same
-///    way you would handle a `chain::Filter` call using ChannelMonitor::get_outputs_to_watch() and
-///    ChannelMonitor::get_funding_txo().
-/// 4) Reconnect blocks on your ChannelMonitors.
-/// 5) Disconnect/connect blocks on the ChannelManager.
-/// 6) Move the ChannelMonitors into your local chain::Watch.
+/// 1) Deserialize all stored [`ChannelMonitor`]s.
+/// 2) Deserialize the [`ChannelManager`] by filling in this struct and calling:
+///    `<(BlockHash, ChannelManager)>::read(reader, args)`
+///    This may result in closing some channels if the [`ChannelMonitor`] is newer than the stored
+///    [`ChannelManager`] state to ensure no loss of funds. Thus, transactions may be broadcasted.
+/// 3) If you are not fetching full blocks, register all relevant [`ChannelMonitor`] outpoints the
+///    same way you would handle a [`chain::Filter`] call using
+///    [`ChannelMonitor::get_outputs_to_watch`] and [`ChannelMonitor::get_funding_txo`].
+/// 4) Reconnect blocks on your [`ChannelMonitor`]s.
+/// 5) Disconnect/connect blocks on the [`ChannelManager`].
+/// 6) Re-persist the [`ChannelMonitor`]s to ensure the latest state is on disk.
+///    Note that if you're using a [`ChainMonitor`] for your [`chain::Watch`] implementation, you
+///    will likely accomplish this as a side-effect of calling [`chain::Watch::watch_channel`] in
+///    the next step.
+/// 7) Move the [`ChannelMonitor`]s into your local [`chain::Watch`]. If you're using a
+///    [`ChainMonitor`], this is done by calling [`chain::Watch::watch_channel`].
 ///
-/// Note that the ordering of #4-6 is not of importance, however all three must occur before you
-/// call any other methods on the newly-deserialized ChannelManager.
+/// Note that the ordering of #4-7 is not of importance, however all four must occur before you
+/// call any other methods on the newly-deserialized [`ChannelManager`].
 ///
 /// Note that because some channels may be closed during deserialization, it is critical that you
 /// always deserialize only the latest version of a ChannelManager and ChannelMonitors available to
@@ -5489,6 +5506,8 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> Writeable f
 /// broadcast), and then later deserialize a newer version of the same ChannelManager (which will
 /// not force-close the same channels but consider them live), you may end up revoking a state for
 /// which you've already broadcasted the transaction.
+///
+/// [`ChainMonitor`]: crate::chain::chainmonitor::ChainMonitor
 pub struct ChannelManagerReadArgs<'a, Signer: 'a + Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
        where M::Target: chain::Watch<Signer>,
         T::Target: BroadcasterInterface,