X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;ds=sidebyside;f=lightning%2Fsrc%2Fln%2Fchannelmanager.rs;h=6298a0d9ae3cbf074e6d8b49f2d54df9f863c658;hb=0d5050833d99728f0cf4799f7fbb2f2108b1ed93;hp=5b29828ebaad09042dd65923512569ce47a31ef8;hpb=001bc7113a92a586e7e7ac4557bbae7a1a402550;p=rust-lightning diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 5b29828e..6298a0d9 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -199,6 +199,7 @@ pub(crate) enum HTLCSource { /// doing a double-pass on route when we get a failure back first_hop_htlc_msat: u64, payment_id: PaymentId, + payment_secret: Option, }, } #[cfg(test)] @@ -209,6 +210,7 @@ impl HTLCSource { session_priv: SecretKey::from_slice(&[1; 32]).unwrap(), first_hop_htlc_msat: 0, payment_id: PaymentId([2; 32]), + payment_secret: None, } } } @@ -416,19 +418,51 @@ pub(crate) enum PendingOutboundPayment { /// Our best known block height at the time this payment was initiated. starting_block_height: u32, }, + /// When a pending payment is fulfilled, we continue tracking it until all pending HTLCs have + /// been resolved. This ensures we don't look up pending payments in ChannelMonitors on restart + /// and add a pending payment that was already fulfilled. + Fulfilled { + session_privs: HashSet<[u8; 32]>, + }, } impl PendingOutboundPayment { - fn remove(&mut self, session_priv: &[u8; 32], part_amt_msat: u64) -> bool { + fn is_retryable(&self) -> bool { + match self { + PendingOutboundPayment::Retryable { .. } => true, + _ => false, + } + } + fn is_fulfilled(&self) -> bool { + match self { + PendingOutboundPayment::Fulfilled { .. } => true, + _ => false, + } + } + + fn mark_fulfilled(&mut self) { + let mut session_privs = HashSet::new(); + core::mem::swap(&mut session_privs, match self { + PendingOutboundPayment::Legacy { session_privs } | + PendingOutboundPayment::Retryable { session_privs, .. } | + PendingOutboundPayment::Fulfilled { session_privs } + => session_privs + }); + *self = PendingOutboundPayment::Fulfilled { session_privs }; + } + + /// panics if part_amt_msat is None and !self.is_fulfilled + fn remove(&mut self, session_priv: &[u8; 32], part_amt_msat: Option) -> bool { let remove_res = match self { PendingOutboundPayment::Legacy { session_privs } | - PendingOutboundPayment::Retryable { session_privs, .. } => { + PendingOutboundPayment::Retryable { session_privs, .. } | + PendingOutboundPayment::Fulfilled { session_privs } => { session_privs.remove(session_priv) } }; if remove_res { if let PendingOutboundPayment::Retryable { ref mut pending_amt_msat, .. } = self { - *pending_amt_msat -= part_amt_msat; + *pending_amt_msat -= part_amt_msat.expect("We must only not provide an amount if the payment was already fulfilled"); } } remove_res @@ -440,6 +474,7 @@ impl PendingOutboundPayment { PendingOutboundPayment::Retryable { session_privs, .. } => { session_privs.insert(session_priv) } + PendingOutboundPayment::Fulfilled { .. } => false }; if insert_res { if let PendingOutboundPayment::Retryable { ref mut pending_amt_msat, .. } = self { @@ -452,7 +487,8 @@ impl PendingOutboundPayment { fn remaining_parts(&self) -> usize { match self { PendingOutboundPayment::Legacy { session_privs } | - PendingOutboundPayment::Retryable { session_privs, .. } => { + PendingOutboundPayment::Retryable { session_privs, .. } | + PendingOutboundPayment::Fulfilled { session_privs } => { session_privs.len() } } @@ -1002,7 +1038,7 @@ macro_rules! handle_monitor_err { ($self: ident, $err: expr, $channel_state: expr, $entry: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr) => { handle_monitor_err!($self, $err, $channel_state, $entry, $action_type, $resend_raa, $resend_commitment, Vec::new(), Vec::new()) }; - ($self: ident, $err: expr, $short_to_id: expr, $chan: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr, $failed_forwards: expr, $failed_fails: expr, $chan_id: expr) => { + ($self: ident, $err: expr, $short_to_id: expr, $chan: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr, $failed_forwards: expr, $failed_fails: expr, $failed_finalized_fulfills: expr, $chan_id: expr) => { match $err { ChannelMonitorUpdateErr::PermanentFailure => { log_error!($self.logger, "Closing channel {} due to monitor update ChannelMonitorUpdateErr::PermanentFailure", log_bytes!($chan_id[..])); @@ -1023,7 +1059,7 @@ macro_rules! handle_monitor_err { (res, true) }, ChannelMonitorUpdateErr::TemporaryFailure => { - log_info!($self.logger, "Disabling channel {} due to monitor update TemporaryFailure. On restore will send {} and process {} forwards and {} fails", + log_info!($self.logger, "Disabling channel {} due to monitor update TemporaryFailure. On restore will send {} and process {} forwards, {} fails, and {} fulfill finalizations", log_bytes!($chan_id[..]), if $resend_commitment && $resend_raa { match $action_type { @@ -1034,25 +1070,29 @@ macro_rules! handle_monitor_err { else if $resend_raa { "RAA" } else { "nothing" }, (&$failed_forwards as &Vec<(PendingHTLCInfo, u64)>).len(), - (&$failed_fails as &Vec<(HTLCSource, PaymentHash, HTLCFailReason)>).len()); + (&$failed_fails as &Vec<(HTLCSource, PaymentHash, HTLCFailReason)>).len(), + (&$failed_finalized_fulfills as &Vec).len()); if !$resend_commitment { debug_assert!($action_type == RAACommitmentOrder::RevokeAndACKFirst || !$resend_raa); } if !$resend_raa { debug_assert!($action_type == RAACommitmentOrder::CommitmentFirst || !$resend_commitment); } - $chan.monitor_update_failed($resend_raa, $resend_commitment, $failed_forwards, $failed_fails); + $chan.monitor_update_failed($resend_raa, $resend_commitment, $failed_forwards, $failed_fails, $failed_finalized_fulfills); (Err(MsgHandleErrInternal::from_chan_no_close(ChannelError::Ignore("Failed to update ChannelMonitor".to_owned()), *$chan_id)), false) }, } }; - ($self: ident, $err: expr, $channel_state: expr, $entry: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr, $failed_forwards: expr, $failed_fails: expr) => { { - let (res, drop) = handle_monitor_err!($self, $err, $channel_state.short_to_id, $entry.get_mut(), $action_type, $resend_raa, $resend_commitment, $failed_forwards, $failed_fails, $entry.key()); + ($self: ident, $err: expr, $channel_state: expr, $entry: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr, $failed_forwards: expr, $failed_fails: expr, $failed_finalized_fulfills: expr) => { { + let (res, drop) = handle_monitor_err!($self, $err, $channel_state.short_to_id, $entry.get_mut(), $action_type, $resend_raa, $resend_commitment, $failed_forwards, $failed_fails, $failed_finalized_fulfills, $entry.key()); if drop { $entry.remove_entry(); } res } }; + ($self: ident, $err: expr, $channel_state: expr, $entry: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr, $failed_forwards: expr, $failed_fails: expr) => { + handle_monitor_err!($self, $err, $channel_state, $entry, $action_type, $resend_raa, $resend_commitment, $failed_forwards, $failed_fails, Vec::new()); + } } macro_rules! return_monitor_err { @@ -1441,7 +1481,7 @@ impl ChannelMana if let Some(monitor_update) = monitor_update { if let Err(e) = self.chain_monitor.update_channel(chan_entry.get().get_funding_txo().unwrap(), monitor_update) { let (result, is_permanent) = - handle_monitor_err!(self, e, channel_state.short_to_id, chan_entry.get_mut(), RAACommitmentOrder::CommitmentFirst, false, false, Vec::new(), Vec::new(), chan_entry.key()); + handle_monitor_err!(self, e, channel_state.short_to_id, chan_entry.get_mut(), RAACommitmentOrder::CommitmentFirst, false, false, Vec::new(), Vec::new(), Vec::new(), chan_entry.key()); if is_permanent { remove_channel!(channel_state, chan_entry); break result; @@ -1979,6 +2019,17 @@ impl ChannelMana let err: Result<(), _> = loop { let mut channel_lock = self.channel_state.lock().unwrap(); + + let mut pending_outbounds = self.pending_outbound_payments.lock().unwrap(); + let payment_entry = pending_outbounds.entry(payment_id); + if let hash_map::Entry::Occupied(payment) = &payment_entry { + if !payment.get().is_retryable() { + return Err(APIError::RouteError { + err: "Payment already completed" + }); + } + } + let id = match channel_lock.short_to_id.get(&path.first().unwrap().short_channel_id) { None => return Err(APIError::ChannelUnavailable{err: "No channel available with first hop!".to_owned()}), Some(id) => id.clone(), @@ -1999,11 +2050,11 @@ impl ChannelMana session_priv: session_priv.clone(), first_hop_htlc_msat: htlc_msat, payment_id, + payment_secret: payment_secret.clone(), }, onion_packet, &self.logger), channel_state, chan); - let mut pending_outbounds = self.pending_outbound_payments.lock().unwrap(); - let payment = pending_outbounds.entry(payment_id).or_insert_with(|| PendingOutboundPayment::Retryable { + let payment = payment_entry.or_insert_with(|| PendingOutboundPayment::Retryable { session_privs: HashSet::new(), pending_amt_msat: 0, payment_hash: *payment_hash, @@ -2199,7 +2250,12 @@ impl ChannelMana return Err(PaymentSendFailure::ParameterError(APIError::APIMisuseError { err: "Unable to retry payments that were initially sent on LDK versions prior to 0.0.102".to_string() })) - } + }, + PendingOutboundPayment::Fulfilled { .. } => { + return Err(PaymentSendFailure::ParameterError(APIError::RouteError { + err: "Payment already completed" + })); + }, } } else { return Err(PaymentSendFailure::ParameterError(APIError::APIMisuseError { @@ -2846,7 +2902,7 @@ impl ChannelMana let ret_err = match res { Ok(Some((update_fee, commitment_signed, monitor_update))) => { if let Err(e) = self.chain_monitor.update_channel(chan.get_funding_txo().unwrap(), monitor_update) { - let (res, drop) = handle_monitor_err!(self, e, short_to_id, chan, RAACommitmentOrder::CommitmentFirst, false, true, Vec::new(), Vec::new(), chan_id); + let (res, drop) = handle_monitor_err!(self, e, short_to_id, chan, RAACommitmentOrder::CommitmentFirst, false, true, Vec::new(), Vec::new(), Vec::new(), chan_id); if drop { retain_channel = false; } res } else { @@ -3027,7 +3083,9 @@ impl ChannelMana session_priv_bytes.copy_from_slice(&session_priv[..]); let mut outbounds = self.pending_outbound_payments.lock().unwrap(); if let hash_map::Entry::Occupied(mut payment) = outbounds.entry(payment_id) { - if payment.get_mut().remove(&session_priv_bytes, path.last().unwrap().fee_msat) { + if payment.get_mut().remove(&session_priv_bytes, Some(path.last().unwrap().fee_msat)) && + !payment.get().is_fulfilled() + { self.pending_events.lock().unwrap().push( events::Event::PaymentPathFailed { payment_hash, @@ -3072,12 +3130,16 @@ impl ChannelMana session_priv_bytes.copy_from_slice(&session_priv[..]); let mut outbounds = self.pending_outbound_payments.lock().unwrap(); let mut all_paths_failed = false; - if let hash_map::Entry::Occupied(mut sessions) = outbounds.entry(payment_id) { - if !sessions.get_mut().remove(&session_priv_bytes, path.last().unwrap().fee_msat) { + if let hash_map::Entry::Occupied(mut payment) = outbounds.entry(payment_id) { + if !payment.get_mut().remove(&session_priv_bytes, Some(path.last().unwrap().fee_msat)) { log_trace!(self.logger, "Received duplicative fail for HTLC with payment_hash {}", log_bytes!(payment_hash.0)); return; } - if sessions.get().remaining_parts() == 0 { + if payment.get().is_fulfilled() { + log_trace!(self.logger, "Received failure of HTLC with payment_hash {} after payment completion", log_bytes!(payment_hash.0)); + return; + } + if payment.get().remaining_parts() == 0 { all_paths_failed = true; } } else { @@ -3325,6 +3387,23 @@ impl ChannelMana } else { unreachable!(); } } + fn finalize_claims(&self, mut sources: Vec) { + for source in sources.drain(..) { + if let HTLCSource::OutboundRoute { session_priv, payment_id, .. } = source { + let mut session_priv_bytes = [0; 32]; + session_priv_bytes.copy_from_slice(&session_priv[..]); + let mut outbounds = self.pending_outbound_payments.lock().unwrap(); + if let hash_map::Entry::Occupied(mut payment) = outbounds.entry(payment_id) { + assert!(payment.get().is_fulfilled()); + payment.get_mut().remove(&session_priv_bytes, None); + if payment.get().remaining_parts() == 0 { + payment.remove(); + } + } + } + } + } + fn claim_funds_internal(&self, mut channel_state_lock: MutexGuard>, source: HTLCSource, payment_preimage: PaymentPreimage, forwarded_htlc_value_msat: Option, from_onchain: bool) { match source { HTLCSource::OutboundRoute { session_priv, payment_id, path, .. } => { @@ -3332,8 +3411,22 @@ impl ChannelMana let mut session_priv_bytes = [0; 32]; session_priv_bytes.copy_from_slice(&session_priv[..]); let mut outbounds = self.pending_outbound_payments.lock().unwrap(); - let found_payment = if let Some(mut sessions) = outbounds.remove(&payment_id) { - sessions.remove(&session_priv_bytes, path.last().unwrap().fee_msat) + let found_payment = if let hash_map::Entry::Occupied(mut payment) = outbounds.entry(payment_id) { + let found_payment = !payment.get().is_fulfilled(); + payment.get_mut().mark_fulfilled(); + if from_onchain { + // We currently immediately remove HTLCs which were fulfilled on-chain. + // This could potentially lead to removing a pending payment too early, + // with a reorg of one block causing us to re-add the fulfilled payment on + // restart. + // TODO: We should have a second monitor event that informs us of payments + // irrevocably fulfilled. + payment.get_mut().remove(&session_priv_bytes, Some(path.last().unwrap().fee_msat)); + if payment.get().remaining_parts() == 0 { + payment.remove(); + } + } + found_payment } else { false }; if found_payment { let payment_hash = PaymentHash(Sha256::hash(&payment_preimage.0).into_inner()); @@ -3404,31 +3497,11 @@ impl ChannelMana self.our_network_pubkey.clone() } - /// Restores a single, given channel to normal operation after a - /// ChannelMonitorUpdateErr::TemporaryFailure was returned from a channel monitor update - /// operation. - /// - /// All ChannelMonitor updates up to and including highest_applied_update_id must have been - /// fully committed in every copy of the given channels' ChannelMonitors. - /// - /// Note that there is no effect to calling with a highest_applied_update_id other than the - /// current latest ChannelMonitorUpdate and one call to this function after multiple - /// ChannelMonitorUpdateErr::TemporaryFailures is fine. The highest_applied_update_id field - /// exists largely only to prevent races between this and concurrent update_monitor calls. - /// - /// Thus, the anticipated use is, at a high level: - /// 1) You register a chain::Watch with this ChannelManager, - /// 2) it stores each update to disk, and begins updating any remote (eg watchtower) copies of - /// said ChannelMonitors as it can, returning ChannelMonitorUpdateErr::TemporaryFailures - /// any time it cannot do so instantly, - /// 3) update(s) are applied to each remote copy of a ChannelMonitor, - /// 4) once all remote copies are updated, you call this function with the update_id that - /// completed, and once it is the latest the Channel will be re-enabled. - pub fn channel_monitor_updated(&self, funding_txo: &OutPoint, highest_applied_update_id: u64) { + fn channel_monitor_updated(&self, funding_txo: &OutPoint, highest_applied_update_id: u64) { let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); let chan_restoration_res; - let mut pending_failures = { + let (mut pending_failures, finalized_claims) = { let mut channel_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_lock; let mut channel = match channel_state.by_id.entry(funding_txo.to_channel_id()) { @@ -3439,8 +3512,8 @@ impl ChannelMana return; } - let (raa, commitment_update, order, pending_forwards, pending_failures, funding_broadcastable, funding_locked) = channel.get_mut().monitor_updating_restored(&self.logger); - let channel_update = if funding_locked.is_some() && channel.get().is_usable() && !channel.get().should_announce() { + let updates = channel.get_mut().monitor_updating_restored(&self.logger); + let channel_update = if updates.funding_locked.is_some() && channel.get().is_usable() && !channel.get().should_announce() { // We only send a channel_update in the case where we are just now sending a // funding_locked and the channel is in a usable state. Further, we rely on the // normal announcement_signatures process to send a channel_update for public @@ -3450,13 +3523,14 @@ impl ChannelMana msg: self.get_channel_update_for_unicast(channel.get()).unwrap(), }) } else { None }; - chan_restoration_res = handle_chan_restoration_locked!(self, channel_lock, channel_state, channel, raa, commitment_update, order, None, pending_forwards, funding_broadcastable, funding_locked); + chan_restoration_res = handle_chan_restoration_locked!(self, channel_lock, channel_state, channel, updates.raa, updates.commitment_update, updates.order, None, updates.accepted_htlcs, updates.funding_broadcastable, updates.funding_locked); if let Some(upd) = channel_update { channel_state.pending_msg_events.push(upd); } - pending_failures + (updates.failed_htlcs, updates.finalized_claimed_htlcs) }; post_handle_chan_restoration!(self, chan_restoration_res); + self.finalize_claims(finalized_claims); for failure in pending_failures.drain(..) { self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), failure.0, &failure.1, failure.2); } @@ -3545,7 +3619,7 @@ impl ChannelMana // hasn't persisted to disk yet - we can't lose money on a transaction that we haven't // accepted payment from yet. We do, however, need to wait to send our funding_locked // until we have persisted our monitor. - chan.monitor_update_failed(false, false, Vec::new(), Vec::new()); + chan.monitor_update_failed(false, false, Vec::new(), Vec::new(), Vec::new()); }, } } @@ -3663,7 +3737,7 @@ impl ChannelMana if let Some(monitor_update) = monitor_update { if let Err(e) = self.chain_monitor.update_channel(chan_entry.get().get_funding_txo().unwrap(), monitor_update) { let (result, is_permanent) = - handle_monitor_err!(self, e, channel_state.short_to_id, chan_entry.get_mut(), RAACommitmentOrder::CommitmentFirst, false, false, Vec::new(), Vec::new(), chan_entry.key()); + handle_monitor_err!(self, e, channel_state.short_to_id, chan_entry.get_mut(), RAACommitmentOrder::CommitmentFirst, false, false, Vec::new(), Vec::new(), Vec::new(), chan_entry.key()); if is_permanent { remove_channel!(channel_state, chan_entry); break result; @@ -3950,37 +4024,51 @@ impl ChannelMana break Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id)); } let was_frozen_for_monitor = chan.get().is_awaiting_monitor_update(); - let (commitment_update, pending_forwards, pending_failures, monitor_update, htlcs_to_fail_in) = - break_chan_entry!(self, chan.get_mut().revoke_and_ack(&msg, &self.logger), channel_state, chan); - htlcs_to_fail = htlcs_to_fail_in; - if let Err(e) = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), monitor_update) { + let raa_updates = break_chan_entry!(self, + chan.get_mut().revoke_and_ack(&msg, &self.logger), channel_state, chan); + htlcs_to_fail = raa_updates.holding_cell_failed_htlcs; + if let Err(e) = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), raa_updates.monitor_update) { if was_frozen_for_monitor { - assert!(commitment_update.is_none() && pending_forwards.is_empty() && pending_failures.is_empty()); + assert!(raa_updates.commitment_update.is_none()); + assert!(raa_updates.accepted_htlcs.is_empty()); + assert!(raa_updates.failed_htlcs.is_empty()); + assert!(raa_updates.finalized_claimed_htlcs.is_empty()); break Err(MsgHandleErrInternal::ignore_no_close("Previous monitor update failure prevented responses to RAA".to_owned())); } else { - if let Err(e) = handle_monitor_err!(self, e, channel_state, chan, RAACommitmentOrder::CommitmentFirst, false, commitment_update.is_some(), pending_forwards, pending_failures) { + if let Err(e) = handle_monitor_err!(self, e, channel_state, chan, + RAACommitmentOrder::CommitmentFirst, false, + raa_updates.commitment_update.is_some(), + raa_updates.accepted_htlcs, raa_updates.failed_htlcs, + raa_updates.finalized_claimed_htlcs) { break Err(e); } else { unreachable!(); } } } - if let Some(updates) = commitment_update { + if let Some(updates) = raa_updates.commitment_update { channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { node_id: counterparty_node_id.clone(), updates, }); } - break Ok((pending_forwards, pending_failures, chan.get().get_short_channel_id().expect("RAA should only work on a short-id-available channel"), chan.get().get_funding_txo().unwrap())) + break Ok((raa_updates.accepted_htlcs, raa_updates.failed_htlcs, + raa_updates.finalized_claimed_htlcs, + chan.get().get_short_channel_id() + .expect("RAA should only work on a short-id-available channel"), + chan.get().get_funding_txo().unwrap())) }, hash_map::Entry::Vacant(_) => break Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id)) } }; self.fail_holding_cell_htlcs(htlcs_to_fail, msg.channel_id); match res { - Ok((pending_forwards, mut pending_failures, short_channel_id, channel_outpoint)) => { + Ok((pending_forwards, mut pending_failures, finalized_claim_htlcs, + short_channel_id, channel_outpoint)) => + { for failure in pending_failures.drain(..) { self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), failure.0, &failure.1, failure.2); } self.forward_htlcs(&mut [(short_channel_id, channel_outpoint, pending_forwards)]); + self.finalize_claims(finalized_claim_htlcs); Ok(()) }, Err(e) => Err(e) @@ -4129,7 +4217,8 @@ impl ChannelMana self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_update.source, &htlc_update.payment_hash, HTLCFailReason::Reason { failure_code: 0x4000 | 8, data: Vec::new() }); } }, - MonitorEvent::CommitmentTxConfirmed(funding_outpoint) => { + MonitorEvent::CommitmentTxConfirmed(funding_outpoint) | + MonitorEvent::UpdateFailed(funding_outpoint) => { let mut channel_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_lock; let by_id = &mut channel_state.by_id; @@ -4145,7 +4234,12 @@ impl ChannelMana msg: update }); } - self.issue_channel_close_events(&chan, ClosureReason::CommitmentTxConfirmed); + let reason = if let MonitorEvent::UpdateFailed(_) = monitor_event { + ClosureReason::ProcessingError { err: "Failed to persist ChannelMonitor update during chain sync".to_string() } + } else { + ClosureReason::CommitmentTxConfirmed + }; + self.issue_channel_close_events(&chan, reason); pending_msg_events.push(events::MessageSendEvent::HandleError { node_id: chan.get_counterparty_node_id(), action: msgs::ErrorAction::SendErrorMessage { @@ -4154,6 +4248,9 @@ impl ChannelMana }); } }, + MonitorEvent::UpdateCompleted { funding_txo, monitor_update_id } => { + self.channel_monitor_updated(&funding_txo, monitor_update_id); + }, } } @@ -4164,6 +4261,14 @@ impl ChannelMana has_pending_monitor_events } + /// In chanmon_consistency_target, we'd like to be able to restore monitor updating without + /// handling all pending events (i.e. not PendingHTLCsForwardable). Thus, we expose monitor + /// update events as a separate process method here. + #[cfg(feature = "fuzztarget")] + pub fn process_monitor_events(&self) { + self.process_pending_monitor_events(); + } + /// Check the holding cell in each channel and free any pending HTLCs in them if possible. /// Returns whether there were any updates such as if pending HTLCs were freed or a monitor /// update was applied. @@ -4192,7 +4297,7 @@ impl ChannelMana if let Some((commitment_update, monitor_update)) = commitment_opt { if let Err(e) = self.chain_monitor.update_channel(chan.get_funding_txo().unwrap(), monitor_update) { has_monitor_update = true; - let (res, close_channel) = handle_monitor_err!(self, e, short_to_id, chan, RAACommitmentOrder::CommitmentFirst, false, true, Vec::new(), Vec::new(), channel_id); + let (res, close_channel) = handle_monitor_err!(self, e, short_to_id, chan, RAACommitmentOrder::CommitmentFirst, false, true, Vec::new(), Vec::new(), Vec::new(), channel_id); handle_errors.push((chan.get_counterparty_node_id(), res)); if close_channel { return false; } } else { @@ -5249,10 +5354,12 @@ impl Readable for HTLCSource { let mut first_hop_htlc_msat: u64 = 0; let mut path = Some(Vec::new()); let mut payment_id = None; + let mut payment_secret = None; read_tlv_fields!(reader, { (0, session_priv, required), (1, payment_id, option), (2, first_hop_htlc_msat, required), + (3, payment_secret, option), (4, path, vec_type), }); if payment_id.is_none() { @@ -5265,6 +5372,7 @@ impl Readable for HTLCSource { first_hop_htlc_msat: first_hop_htlc_msat, path: path.unwrap(), payment_id: payment_id.unwrap(), + payment_secret, }) } 1 => Ok(HTLCSource::PreviousHopData(Readable::read(reader)?)), @@ -5276,13 +5384,14 @@ impl Readable for HTLCSource { impl Writeable for HTLCSource { fn write(&self, writer: &mut W) -> Result<(), ::io::Error> { match self { - HTLCSource::OutboundRoute { ref session_priv, ref first_hop_htlc_msat, ref path, payment_id } => { + HTLCSource::OutboundRoute { ref session_priv, ref first_hop_htlc_msat, ref path, payment_id, payment_secret } => { 0u8.write(writer)?; let payment_id_opt = Some(payment_id); write_tlv_fields!(writer, { (0, session_priv, required), (1, payment_id_opt, option), (2, first_hop_htlc_msat, required), + (3, payment_secret, option), (4, path, vec_type), }); } @@ -5326,10 +5435,13 @@ impl_writeable_tlv_based!(PendingInboundPayment, { (8, min_value_msat, required), }); -impl_writeable_tlv_based_enum!(PendingOutboundPayment, +impl_writeable_tlv_based_enum_upgradable!(PendingOutboundPayment, (0, Legacy) => { (0, session_privs, required), }, + (1, Fulfilled) => { + (0, session_privs, required), + }, (2, Retryable) => { (0, session_privs, required), (2, payment_hash, required), @@ -5338,7 +5450,7 @@ impl_writeable_tlv_based_enum!(PendingOutboundPayment, (8, pending_amt_msat, required), (10, starting_block_height, required), }, -;); +); impl Writeable for ChannelManager where M::Target: chain::Watch, @@ -5431,7 +5543,9 @@ impl Writeable f // For backwards compat, write the session privs and their total length. let mut num_pending_outbounds_compat: u64 = 0; for (_, outbound) in pending_outbound_payments.iter() { - num_pending_outbounds_compat += outbound.remaining_parts() as u64; + if !outbound.is_fulfilled() { + num_pending_outbounds_compat += outbound.remaining_parts() as u64; + } } num_pending_outbounds_compat.write(writer)?; for (_, outbound) in pending_outbound_payments.iter() { @@ -5442,6 +5556,7 @@ impl Writeable f session_priv.write(writer)?; } } + PendingOutboundPayment::Fulfilled { .. } => {}, } } @@ -5452,7 +5567,8 @@ impl Writeable f PendingOutboundPayment::Legacy { session_privs } | PendingOutboundPayment::Retryable { session_privs, .. } => { pending_outbound_payments_no_retry.insert(*id, session_privs.clone()); - } + }, + _ => {}, } } write_tlv_fields!(writer, { @@ -5468,20 +5584,25 @@ impl Writeable f /// /// At a high-level, the process for deserializing a ChannelManager and resuming normal operation /// is: -/// 1) Deserialize all stored ChannelMonitors. -/// 2) Deserialize the ChannelManager by filling in this struct and calling: -/// <(BlockHash, ChannelManager)>::read(reader, args) -/// This may result in closing some Channels if the ChannelMonitor is newer than the stored -/// ChannelManager state to ensure no loss of funds. Thus, transactions may be broadcasted. -/// 3) If you are not fetching full blocks, register all relevant ChannelMonitor outpoints the same -/// way you would handle a `chain::Filter` call using ChannelMonitor::get_outputs_to_watch() and -/// ChannelMonitor::get_funding_txo(). -/// 4) Reconnect blocks on your ChannelMonitors. -/// 5) Disconnect/connect blocks on the ChannelManager. -/// 6) Move the ChannelMonitors into your local chain::Watch. +/// 1) Deserialize all stored [`ChannelMonitor`]s. +/// 2) Deserialize the [`ChannelManager`] by filling in this struct and calling: +/// `<(BlockHash, ChannelManager)>::read(reader, args)` +/// This may result in closing some channels if the [`ChannelMonitor`] is newer than the stored +/// [`ChannelManager`] state to ensure no loss of funds. Thus, transactions may be broadcasted. +/// 3) If you are not fetching full blocks, register all relevant [`ChannelMonitor`] outpoints the +/// same way you would handle a [`chain::Filter`] call using +/// [`ChannelMonitor::get_outputs_to_watch`] and [`ChannelMonitor::get_funding_txo`]. +/// 4) Reconnect blocks on your [`ChannelMonitor`]s. +/// 5) Disconnect/connect blocks on the [`ChannelManager`]. +/// 6) Re-persist the [`ChannelMonitor`]s to ensure the latest state is on disk. +/// Note that if you're using a [`ChainMonitor`] for your [`chain::Watch`] implementation, you +/// will likely accomplish this as a side-effect of calling [`chain::Watch::watch_channel`] in +/// the next step. +/// 7) Move the [`ChannelMonitor`]s into your local [`chain::Watch`]. If you're using a +/// [`ChainMonitor`], this is done by calling [`chain::Watch::watch_channel`]. /// -/// Note that the ordering of #4-6 is not of importance, however all three must occur before you -/// call any other methods on the newly-deserialized ChannelManager. +/// Note that the ordering of #4-7 is not of importance, however all four must occur before you +/// call any other methods on the newly-deserialized [`ChannelManager`]. /// /// Note that because some channels may be closed during deserialization, it is critical that you /// always deserialize only the latest version of a ChannelManager and ChannelMonitors available to @@ -5489,6 +5610,8 @@ impl Writeable f /// broadcast), and then later deserialize a newer version of the same ChannelManager (which will /// not force-close the same channels but consider them live), you may end up revoking a state for /// which you've already broadcasted the transaction. +/// +/// [`ChainMonitor`]: crate::chain::chainmonitor::ChainMonitor pub struct ChannelManagerReadArgs<'a, Signer: 'a + Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> where M::Target: chain::Watch, T::Target: BroadcasterInterface,