Ensure all HTLCs for a claimed payment are claimed on startup
[rust-lightning] / lightning / src / ln / channelmanager.rs
index 2ce466c9ac6d4efe7c0cea8a2af324a7d4ecaf19..a62f44806d45ad617f369b47018538b1c31747c7 100644 (file)
@@ -164,7 +164,7 @@ enum OnionPayload {
        Invoice {
                /// This is only here for backwards-compatibility in serialization, in the future it can be
                /// removed, breaking clients running 0.0.106 and earlier.
-               _legacy_hop_data: msgs::FinalOnionHopData,
+               _legacy_hop_data: Option<msgs::FinalOnionHopData>,
        },
        /// Contains the payer-provided preimage.
        Spontaneous(PaymentPreimage),
@@ -418,7 +418,7 @@ pub(super) struct ChannelHolder<Signer: Sign> {
        /// Note that while this is held in the same mutex as the channels themselves, no consistency
        /// guarantees are made about the channels given here actually existing anymore by the time you
        /// go to read them!
-       claimable_htlcs: HashMap<PaymentHash, Vec<ClaimableHTLC>>,
+       claimable_htlcs: HashMap<PaymentHash, (events::PaymentPurpose, Vec<ClaimableHTLC>)>,
        /// Messages to send to peers - pushed to in the same lock that they are generated in (except
        /// for broadcast messages, where ordering isn't as strict).
        pub(super) pending_msg_events: Vec<MessageSendEvent>,
@@ -1769,17 +1769,18 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                });
        }
 
-       fn close_channel_internal(&self, channel_id: &[u8; 32], target_feerate_sats_per_1000_weight: Option<u32>) -> Result<(), APIError> {
+       fn close_channel_internal(&self, channel_id: &[u8; 32], counterparty_node_id: &PublicKey, target_feerate_sats_per_1000_weight: Option<u32>) -> Result<(), APIError> {
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
 
-               let counterparty_node_id;
                let mut failed_htlcs: Vec<(HTLCSource, PaymentHash)>;
                let result: Result<(), _> = loop {
                        let mut channel_state_lock = self.channel_state.lock().unwrap();
                        let channel_state = &mut *channel_state_lock;
                        match channel_state.by_id.entry(channel_id.clone()) {
                                hash_map::Entry::Occupied(mut chan_entry) => {
-                                       counterparty_node_id = chan_entry.get().get_counterparty_node_id();
+                                       if *counterparty_node_id != chan_entry.get().get_counterparty_node_id(){
+                                               return Err(APIError::APIMisuseError { err: "The passed counterparty_node_id doesn't match the channel's counterparty node_id".to_owned() });
+                                       }
                                        let per_peer_state = self.per_peer_state.read().unwrap();
                                        let (shutdown_msg, monitor_update, htlcs) = match per_peer_state.get(&counterparty_node_id) {
                                                Some(peer_state) => {
@@ -1804,7 +1805,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                        }
 
                                        channel_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown {
-                                               node_id: counterparty_node_id,
+                                               node_id: *counterparty_node_id,
                                                msg: shutdown_msg
                                        });
 
@@ -1827,7 +1828,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                        self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_source.0, &htlc_source.1, HTLCFailReason::Reason { failure_code: 0x4000 | 8, data: Vec::new() });
                }
 
-               let _ = handle_error!(self, result, counterparty_node_id);
+               let _ = handle_error!(self, result, *counterparty_node_id);
                Ok(())
        }
 
@@ -1848,8 +1849,8 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
        /// [`ChannelConfig::force_close_avoidance_max_fee_satoshis`]: crate::util::config::ChannelConfig::force_close_avoidance_max_fee_satoshis
        /// [`Background`]: crate::chain::chaininterface::ConfirmationTarget::Background
        /// [`Normal`]: crate::chain::chaininterface::ConfirmationTarget::Normal
-       pub fn close_channel(&self, channel_id: &[u8; 32]) -> Result<(), APIError> {
-               self.close_channel_internal(channel_id, None)
+       pub fn close_channel(&self, channel_id: &[u8; 32], counterparty_node_id: &PublicKey) -> Result<(), APIError> {
+               self.close_channel_internal(channel_id, counterparty_node_id, None)
        }
 
        /// Begins the process of closing a channel. After this call (plus some timeout), no new HTLCs
@@ -1871,8 +1872,8 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
        /// [`ChannelConfig::force_close_avoidance_max_fee_satoshis`]: crate::util::config::ChannelConfig::force_close_avoidance_max_fee_satoshis
        /// [`Background`]: crate::chain::chaininterface::ConfirmationTarget::Background
        /// [`Normal`]: crate::chain::chaininterface::ConfirmationTarget::Normal
-       pub fn close_channel_with_target_feerate(&self, channel_id: &[u8; 32], target_feerate_sats_per_1000_weight: u32) -> Result<(), APIError> {
-               self.close_channel_internal(channel_id, Some(target_feerate_sats_per_1000_weight))
+       pub fn close_channel_with_target_feerate(&self, channel_id: &[u8; 32], counterparty_node_id: &PublicKey, target_feerate_sats_per_1000_weight: u32) -> Result<(), APIError> {
+               self.close_channel_internal(channel_id, counterparty_node_id, Some(target_feerate_sats_per_1000_weight))
        }
 
        #[inline]
@@ -1891,22 +1892,18 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                }
        }
 
-       /// `peer_node_id` should be set when we receive a message from a peer, but not set when the
+       /// `peer_msg` should be set when we receive a message from a peer, but not set when the
        /// user closes, which will be re-exposed as the `ChannelClosed` reason.
-       fn force_close_channel_with_peer(&self, channel_id: &[u8; 32], peer_node_id: Option<&PublicKey>, peer_msg: Option<&String>) -> Result<PublicKey, APIError> {
+       fn force_close_channel_with_peer(&self, channel_id: &[u8; 32], peer_node_id: &PublicKey, peer_msg: Option<&String>) -> Result<PublicKey, APIError> {
                let mut chan = {
                        let mut channel_state_lock = self.channel_state.lock().unwrap();
                        let channel_state = &mut *channel_state_lock;
                        if let hash_map::Entry::Occupied(chan) = channel_state.by_id.entry(channel_id.clone()) {
-                               if let Some(node_id) = peer_node_id {
-                                       if chan.get().get_counterparty_node_id() != *node_id {
-                                               return Err(APIError::ChannelUnavailable{err: "No such channel".to_owned()});
-                                       }
+                               if chan.get().get_counterparty_node_id() != *peer_node_id {
+                                       return Err(APIError::ChannelUnavailable{err: "No such channel".to_owned()});
                                }
-                               if peer_node_id.is_some() {
-                                       if let Some(peer_msg) = peer_msg {
-                                               self.issue_channel_close_events(chan.get(),ClosureReason::CounterpartyForceClosed { peer_msg: peer_msg.to_string() });
-                                       }
+                               if let Some(peer_msg) = peer_msg {
+                                       self.issue_channel_close_events(chan.get(),ClosureReason::CounterpartyForceClosed { peer_msg: peer_msg.to_string() });
                                } else {
                                        self.issue_channel_close_events(chan.get(),ClosureReason::HolderForceClosed);
                                }
@@ -1928,10 +1925,12 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
        }
 
        /// Force closes a channel, immediately broadcasting the latest local commitment transaction to
-       /// the chain and rejecting new HTLCs on the given channel. Fails if channel_id is unknown to the manager.
-       pub fn force_close_channel(&self, channel_id: &[u8; 32]) -> Result<(), APIError> {
+       /// the chain and rejecting new HTLCs on the given channel. Fails if `channel_id` is unknown to
+       /// the manager, or if the `counterparty_node_id` isn't the counterparty of the corresponding
+       /// channel.
+       pub fn force_close_channel(&self, channel_id: &[u8; 32], counterparty_node_id: &PublicKey) -> Result<(), APIError> {
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
-               match self.force_close_channel_with_peer(channel_id, None, None) {
+               match self.force_close_channel_with_peer(channel_id, counterparty_node_id, None) {
                        Ok(counterparty_node_id) => {
                                self.channel_state.lock().unwrap().pending_msg_events.push(
                                        events::MessageSendEvent::HandleError {
@@ -1951,7 +1950,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
        /// for each to the chain and rejecting new HTLCs on each.
        pub fn force_close_all_channels(&self) {
                for chan in self.list_channels() {
-                       let _ = self.force_close_channel(&chan.channel_id);
+                       let _ = self.force_close_channel(&chan.channel_id, &chan.counterparty.node_id);
                }
        }
 
@@ -2687,8 +2686,9 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
 
        /// Handles the generation of a funding transaction, optionally (for tests) with a function
        /// which checks the correctness of the funding transaction given the associated channel.
-       fn funding_transaction_generated_intern<FundingOutput: Fn(&Channel<Signer>, &Transaction) -> Result<OutPoint, APIError>>
-                       (&self, temporary_channel_id: &[u8; 32], funding_transaction: Transaction, find_funding_output: FundingOutput) -> Result<(), APIError> {
+       fn funding_transaction_generated_intern<FundingOutput: Fn(&Channel<Signer>, &Transaction) -> Result<OutPoint, APIError>>(
+               &self, temporary_channel_id: &[u8; 32], _counterparty_node_id: &PublicKey, funding_transaction: Transaction, find_funding_output: FundingOutput
+       ) -> Result<(), APIError> {
                let (chan, msg) = {
                        let (res, chan) = match self.channel_state.lock().unwrap().by_id.remove(temporary_channel_id) {
                                Some(mut chan) => {
@@ -2729,8 +2729,8 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
        }
 
        #[cfg(test)]
-       pub(crate) fn funding_transaction_generated_unchecked(&self, temporary_channel_id: &[u8; 32], funding_transaction: Transaction, output_index: u16) -> Result<(), APIError> {
-               self.funding_transaction_generated_intern(temporary_channel_id, funding_transaction, |_, tx| {
+       pub(crate) fn funding_transaction_generated_unchecked(&self, temporary_channel_id: &[u8; 32], counterparty_node_id: &PublicKey, funding_transaction: Transaction, output_index: u16) -> Result<(), APIError> {
+               self.funding_transaction_generated_intern(temporary_channel_id, counterparty_node_id, funding_transaction, |_, tx| {
                        Ok(OutPoint { txid: tx.txid(), index: output_index })
                })
        }
@@ -2757,7 +2757,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
        ///
        /// [`Event::FundingGenerationReady`]: crate::util::events::Event::FundingGenerationReady
        /// [`Event::ChannelClosed`]: crate::util::events::Event::ChannelClosed
-       pub fn funding_transaction_generated(&self, temporary_channel_id: &[u8; 32], funding_transaction: Transaction) -> Result<(), APIError> {
+       pub fn funding_transaction_generated(&self, temporary_channel_id: &[u8; 32], counterparty_node_id: &PublicKey, funding_transaction: Transaction) -> Result<(), APIError> {
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
 
                for inp in funding_transaction.input.iter() {
@@ -2767,7 +2767,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                });
                        }
                }
-               self.funding_transaction_generated_intern(temporary_channel_id, funding_transaction, |chan, tx| {
+               self.funding_transaction_generated_intern(temporary_channel_id, counterparty_node_id, funding_transaction, |chan, tx| {
                        let mut output_index = None;
                        let expected_spk = chan.get_funding_redeemscript().to_v0_p2wsh();
                        for (idx, outp) in tx.output.iter().enumerate() {
@@ -3098,7 +3098,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                                                        prev_funding_outpoint } => {
                                                                let (cltv_expiry, onion_payload, payment_data, phantom_shared_secret) = match routing {
                                                                        PendingHTLCRouting::Receive { payment_data, incoming_cltv_expiry, phantom_shared_secret } => {
-                                                                               let _legacy_hop_data = payment_data.clone();
+                                                                               let _legacy_hop_data = Some(payment_data.clone());
                                                                                (incoming_cltv_expiry, OnionPayload::Invoice { _legacy_hop_data }, Some(payment_data), phantom_shared_secret)
                                                                        },
                                                                        PendingHTLCRouting::ReceiveKeysend { payment_preimage, incoming_cltv_expiry } =>
@@ -3143,8 +3143,14 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                                                macro_rules! check_total_value {
                                                                        ($payment_data: expr, $payment_preimage: expr) => {{
                                                                                let mut payment_received_generated = false;
-                                                                               let htlcs = channel_state.claimable_htlcs.entry(payment_hash)
-                                                                                       .or_insert(Vec::new());
+                                                                               let purpose = || {
+                                                                                       events::PaymentPurpose::InvoicePayment {
+                                                                                               payment_preimage: $payment_preimage,
+                                                                                               payment_secret: $payment_data.payment_secret,
+                                                                                       }
+                                                                               };
+                                                                               let (_, htlcs) = channel_state.claimable_htlcs.entry(payment_hash)
+                                                                                       .or_insert_with(|| (purpose(), Vec::new()));
                                                                                if htlcs.len() == 1 {
                                                                                        if let OnionPayload::Spontaneous(_) = htlcs[0].onion_payload {
                                                                                                log_trace!(self.logger, "Failing new HTLC with payment_hash {} as we already had an existing keysend HTLC with the same payment hash", log_bytes!(payment_hash.0));
@@ -3175,10 +3181,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                                                                        htlcs.push(claimable_htlc);
                                                                                        new_events.push(events::Event::PaymentReceived {
                                                                                                payment_hash,
-                                                                                               purpose: events::PaymentPurpose::InvoicePayment {
-                                                                                                       payment_preimage: $payment_preimage,
-                                                                                                       payment_secret: $payment_data.payment_secret,
-                                                                                               },
+                                                                                               purpose: purpose(),
                                                                                                amt: total_value,
                                                                                        });
                                                                                        payment_received_generated = true;
@@ -3216,11 +3219,12 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                                                                        OnionPayload::Spontaneous(preimage) => {
                                                                                                match channel_state.claimable_htlcs.entry(payment_hash) {
                                                                                                        hash_map::Entry::Vacant(e) => {
-                                                                                                               e.insert(vec![claimable_htlc]);
+                                                                                                               let purpose = events::PaymentPurpose::SpontaneousPayment(preimage);
+                                                                                                               e.insert((purpose.clone(), vec![claimable_htlc]));
                                                                                                                new_events.push(events::Event::PaymentReceived {
                                                                                                                        payment_hash,
                                                                                                                        amt: amt_to_forward,
-                                                                                                                       purpose: events::PaymentPurpose::SpontaneousPayment(preimage),
+                                                                                                                       purpose,
                                                                                                                });
                                                                                                        },
                                                                                                        hash_map::Entry::Occupied(_) => {
@@ -3459,7 +3463,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                        true
                                });
 
-                               channel_state.claimable_htlcs.retain(|payment_hash, htlcs| {
+                               channel_state.claimable_htlcs.retain(|payment_hash, (_, htlcs)| {
                                        if htlcs.is_empty() {
                                                // This should be unreachable
                                                debug_assert!(false);
@@ -3503,7 +3507,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
 
                let mut channel_state = Some(self.channel_state.lock().unwrap());
                let removed_source = channel_state.as_mut().unwrap().claimable_htlcs.remove(payment_hash);
-               if let Some(mut sources) = removed_source {
+               if let Some((_, mut sources)) = removed_source {
                        for htlc in sources.drain(..) {
                                if channel_state.is_none() { channel_state = Some(self.channel_state.lock().unwrap()); }
                                let mut htlc_msat_height_data = byte_utils::be64_to_array(htlc.value).to_vec();
@@ -3803,7 +3807,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
 
                let mut channel_state = Some(self.channel_state.lock().unwrap());
                let removed_source = channel_state.as_mut().unwrap().claimable_htlcs.remove(&payment_hash);
-               if let Some(mut sources) = removed_source {
+               if let Some((_, mut sources)) = removed_source {
                        assert!(!sources.is_empty());
 
                        // If we are claiming an MPP payment, we have to take special care to ensure that each
@@ -3958,7 +3962,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                }
        }
 
-       fn claim_funds_internal(&self, mut channel_state_lock: MutexGuard<ChannelHolder<Signer>>, source: HTLCSource, payment_preimage: PaymentPreimage, forwarded_htlc_value_msat: Option<u64>, from_onchain: bool) {
+       fn claim_funds_internal(&self, mut channel_state_lock: MutexGuard<ChannelHolder<Signer>>, source: HTLCSource, payment_preimage: PaymentPreimage, forwarded_htlc_value_msat: Option<u64>, from_onchain: bool, next_channel_id: [u8; 32]) {
                match source {
                        HTLCSource::OutboundRoute { session_priv, payment_id, path, .. } => {
                                mem::drop(channel_state_lock);
@@ -4049,12 +4053,14 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                                } else { None };
 
                                                let mut pending_events = self.pending_events.lock().unwrap();
+                                               let prev_channel_id = Some(prev_outpoint.to_channel_id());
+                                               let next_channel_id = Some(next_channel_id);
 
-                                               let source_channel_id = Some(prev_outpoint.to_channel_id());
                                                pending_events.push(events::Event::PaymentForwarded {
-                                                       source_channel_id,
                                                        fee_earned_msat,
                                                        claim_from_onchain_tx: from_onchain,
+                                                       prev_channel_id,
+                                                       next_channel_id,
                                                });
                                        }
                                }
@@ -4112,7 +4118,9 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
        /// Called to accept a request to open a channel after [`Event::OpenChannelRequest`] has been
        /// triggered.
        ///
-       /// The `temporary_channel_id` parameter indicates which inbound channel should be accepted.
+       /// The `temporary_channel_id` parameter indicates which inbound channel should be accepted,
+       /// and the `counterparty_node_id` parameter is the id of the peer which has requested to open
+       /// the channel.
        ///
        /// For inbound channels, the `user_channel_id` parameter will be provided back in
        /// [`Event::ChannelClosed::user_channel_id`] to allow tracking of which events correspond
@@ -4120,7 +4128,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
        ///
        /// [`Event::OpenChannelRequest`]: events::Event::OpenChannelRequest
        /// [`Event::ChannelClosed::user_channel_id`]: events::Event::ChannelClosed::user_channel_id
-       pub fn accept_inbound_channel(&self, temporary_channel_id: &[u8; 32], user_channel_id: u64) -> Result<(), APIError> {
+       pub fn accept_inbound_channel(&self, temporary_channel_id: &[u8; 32], counterparty_node_id: &PublicKey, user_channel_id: u64) -> Result<(), APIError> {
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
 
                let mut channel_state_lock = self.channel_state.lock().unwrap();
@@ -4130,6 +4138,9 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                if !channel.get().inbound_is_awaiting_accept() {
                                        return Err(APIError::APIMisuseError { err: "The channel isn't currently awaiting to be accepted.".to_owned() });
                                }
+                               if *counterparty_node_id != channel.get().get_counterparty_node_id() {
+                                       return Err(APIError::APIMisuseError { err: "The passed counterparty_node_id doesn't match the channel's counterparty node_id".to_owned() });
+                               }
                                channel_state.pending_msg_events.push(events::MessageSendEvent::SendAcceptChannel {
                                        node_id: channel.get().get_counterparty_node_id(),
                                        msg: channel.get_mut().accept_inbound_channel(user_channel_id),
@@ -4212,6 +4223,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                let mut pending_events = self.pending_events.lock().unwrap();
                pending_events.push(events::Event::FundingGenerationReady {
                        temporary_channel_id: msg.temporary_channel_id,
+                       counterparty_node_id: *counterparty_node_id,
                        channel_value_satoshis: value,
                        output_script,
                        user_channel_id: user_id,
@@ -4507,7 +4519,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id))
                        }
                };
-               self.claim_funds_internal(channel_lock, htlc_source, msg.payment_preimage.clone(), Some(forwarded_htlc_value), false);
+               self.claim_funds_internal(channel_lock, htlc_source, msg.payment_preimage.clone(), Some(forwarded_htlc_value), false, msg.channel_id);
                Ok(())
        }
 
@@ -4827,48 +4839,50 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                let mut failed_channels = Vec::new();
                let mut pending_monitor_events = self.chain_monitor.release_pending_monitor_events();
                let has_pending_monitor_events = !pending_monitor_events.is_empty();
-               for monitor_event in pending_monitor_events.drain(..) {
-                       match monitor_event {
-                               MonitorEvent::HTLCEvent(htlc_update) => {
-                                       if let Some(preimage) = htlc_update.payment_preimage {
-                                               log_trace!(self.logger, "Claiming HTLC with preimage {} from our monitor", log_bytes!(preimage.0));
-                                               self.claim_funds_internal(self.channel_state.lock().unwrap(), htlc_update.source, preimage, htlc_update.onchain_value_satoshis.map(|v| v * 1000), true);
-                                       } else {
-                                               log_trace!(self.logger, "Failing HTLC with hash {} from our monitor", log_bytes!(htlc_update.payment_hash.0));
-                                               self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_update.source, &htlc_update.payment_hash, HTLCFailReason::Reason { failure_code: 0x4000 | 8, data: Vec::new() });
-                                       }
-                               },
-                               MonitorEvent::CommitmentTxConfirmed(funding_outpoint) |
-                               MonitorEvent::UpdateFailed(funding_outpoint) => {
-                                       let mut channel_lock = self.channel_state.lock().unwrap();
-                                       let channel_state = &mut *channel_lock;
-                                       let by_id = &mut channel_state.by_id;
-                                       let pending_msg_events = &mut channel_state.pending_msg_events;
-                                       if let hash_map::Entry::Occupied(chan_entry) = by_id.entry(funding_outpoint.to_channel_id()) {
-                                               let mut chan = remove_channel!(self, channel_state, chan_entry);
-                                               failed_channels.push(chan.force_shutdown(false));
-                                               if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
-                                                       pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
-                                                               msg: update
+               for (funding_outpoint, mut monitor_events) in pending_monitor_events.drain(..) {
+                       for monitor_event in monitor_events.drain(..) {
+                               match monitor_event {
+                                       MonitorEvent::HTLCEvent(htlc_update) => {
+                                               if let Some(preimage) = htlc_update.payment_preimage {
+                                                       log_trace!(self.logger, "Claiming HTLC with preimage {} from our monitor", log_bytes!(preimage.0));
+                                                       self.claim_funds_internal(self.channel_state.lock().unwrap(), htlc_update.source, preimage, htlc_update.onchain_value_satoshis.map(|v| v * 1000), true, funding_outpoint.to_channel_id());
+                                               } else {
+                                                       log_trace!(self.logger, "Failing HTLC with hash {} from our monitor", log_bytes!(htlc_update.payment_hash.0));
+                                                       self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_update.source, &htlc_update.payment_hash, HTLCFailReason::Reason { failure_code: 0x4000 | 8, data: Vec::new() });
+                                               }
+                                       },
+                                       MonitorEvent::CommitmentTxConfirmed(funding_outpoint) |
+                                       MonitorEvent::UpdateFailed(funding_outpoint) => {
+                                               let mut channel_lock = self.channel_state.lock().unwrap();
+                                               let channel_state = &mut *channel_lock;
+                                               let by_id = &mut channel_state.by_id;
+                                               let pending_msg_events = &mut channel_state.pending_msg_events;
+                                               if let hash_map::Entry::Occupied(chan_entry) = by_id.entry(funding_outpoint.to_channel_id()) {
+                                                       let mut chan = remove_channel!(self, channel_state, chan_entry);
+                                                       failed_channels.push(chan.force_shutdown(false));
+                                                       if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
+                                                               pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
+                                                                       msg: update
+                                                               });
+                                                       }
+                                                       let reason = if let MonitorEvent::UpdateFailed(_) = monitor_event {
+                                                               ClosureReason::ProcessingError { err: "Failed to persist ChannelMonitor update during chain sync".to_string() }
+                                                       } else {
+                                                               ClosureReason::CommitmentTxConfirmed
+                                                       };
+                                                       self.issue_channel_close_events(&chan, reason);
+                                                       pending_msg_events.push(events::MessageSendEvent::HandleError {
+                                                               node_id: chan.get_counterparty_node_id(),
+                                                               action: msgs::ErrorAction::SendErrorMessage {
+                                                                       msg: msgs::ErrorMessage { channel_id: chan.channel_id(), data: "Channel force-closed".to_owned() }
+                                                               },
                                                        });
                                                }
-                                               let reason = if let MonitorEvent::UpdateFailed(_) = monitor_event {
-                                                       ClosureReason::ProcessingError { err: "Failed to persist ChannelMonitor update during chain sync".to_string() }
-                                               } else {
-                                                       ClosureReason::CommitmentTxConfirmed
-                                               };
-                                               self.issue_channel_close_events(&chan, reason);
-                                               pending_msg_events.push(events::MessageSendEvent::HandleError {
-                                                       node_id: chan.get_counterparty_node_id(),
-                                                       action: msgs::ErrorAction::SendErrorMessage {
-                                                               msg: msgs::ErrorMessage { channel_id: chan.channel_id(), data: "Channel force-closed".to_owned() }
-                                                       },
-                                               });
-                                       }
-                               },
-                               MonitorEvent::UpdateCompleted { funding_txo, monitor_update_id } => {
-                                       self.channel_monitor_updated(&funding_txo, monitor_update_id);
-                               },
+                                       },
+                                       MonitorEvent::UpdateCompleted { funding_txo, monitor_update_id } => {
+                                               self.channel_monitor_updated(&funding_txo, monitor_update_id);
+                                       },
+                               }
                        }
                }
 
@@ -5530,7 +5544,7 @@ where
                        });
 
                        if let Some(height) = height_opt {
-                               channel_state.claimable_htlcs.retain(|payment_hash, htlcs| {
+                               channel_state.claimable_htlcs.retain(|payment_hash, (_, htlcs)| {
                                        htlcs.retain(|htlc| {
                                                // If height is approaching the number of blocks we think it takes us to get
                                                // our commitment transaction confirmed before the HTLC expires, plus the
@@ -5698,39 +5712,21 @@ impl<Signer: Sign, M: Deref , T: Deref , K: Deref , F: Deref , L: Deref >
                        let channel_state = &mut *channel_state_lock;
                        let pending_msg_events = &mut channel_state.pending_msg_events;
                        let short_to_id = &mut channel_state.short_to_id;
-                       if no_connection_possible {
-                               log_debug!(self.logger, "Failing all channels with {} due to no_connection_possible", log_pubkey!(counterparty_node_id));
-                               channel_state.by_id.retain(|_, chan| {
-                                       if chan.get_counterparty_node_id() == *counterparty_node_id {
+                       log_debug!(self.logger, "Marking channels with {} disconnected and generating channel_updates. We believe we {} make future connections to this peer.",
+                               log_pubkey!(counterparty_node_id), if no_connection_possible { "cannot" } else { "can" });
+                       channel_state.by_id.retain(|_, chan| {
+                               if chan.get_counterparty_node_id() == *counterparty_node_id {
+                                       chan.remove_uncommitted_htlcs_and_mark_paused(&self.logger);
+                                       if chan.is_shutdown() {
                                                update_maps_on_chan_removal!(self, short_to_id, chan);
-                                               failed_channels.push(chan.force_shutdown(true));
-                                               if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
-                                                       pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
-                                                               msg: update
-                                                       });
-                                               }
                                                self.issue_channel_close_events(chan, ClosureReason::DisconnectedPeer);
-                                               false
+                                               return false;
                                        } else {
-                                               true
+                                               no_channels_remain = false;
                                        }
-                               });
-                       } else {
-                               log_debug!(self.logger, "Marking channels with {} disconnected and generating channel_updates", log_pubkey!(counterparty_node_id));
-                               channel_state.by_id.retain(|_, chan| {
-                                       if chan.get_counterparty_node_id() == *counterparty_node_id {
-                                               chan.remove_uncommitted_htlcs_and_mark_paused(&self.logger);
-                                               if chan.is_shutdown() {
-                                                       update_maps_on_chan_removal!(self, short_to_id, chan);
-                                                       self.issue_channel_close_events(chan, ClosureReason::DisconnectedPeer);
-                                                       return false;
-                                               } else {
-                                                       no_channels_remain = false;
-                                               }
-                                       }
-                                       true
-                               })
-                       }
+                               }
+                               true
+                       });
                        pending_msg_events.retain(|msg| {
                                match msg {
                                        &events::MessageSendEvent::SendAcceptChannel { ref node_id, .. } => node_id != counterparty_node_id,
@@ -5814,7 +5810,7 @@ impl<Signer: Sign, M: Deref , T: Deref , K: Deref , F: Deref , L: Deref >
                        for chan in self.list_channels() {
                                if chan.counterparty.node_id == *counterparty_node_id {
                                        // Untrusted messages from peer, we throw away the error if id points to a non-existent channel
-                                       let _ = self.force_close_channel_with_peer(&chan.channel_id, Some(counterparty_node_id), Some(&msg.data));
+                                       let _ = self.force_close_channel_with_peer(&chan.channel_id, counterparty_node_id, Some(&msg.data));
                                }
                        }
                } else {
@@ -5836,7 +5832,7 @@ impl<Signer: Sign, M: Deref , T: Deref , K: Deref , F: Deref , L: Deref >
                        }
 
                        // Untrusted messages from peer, we throw away the error if id points to a non-existent channel
-                       let _ = self.force_close_channel_with_peer(&msg.channel_id, Some(counterparty_node_id), Some(&msg.data));
+                       let _ = self.force_close_channel_with_peer(&msg.channel_id, counterparty_node_id, Some(&msg.data));
                }
        }
 }
@@ -6069,13 +6065,9 @@ impl_writeable_tlv_based!(HTLCPreviousHopData, {
 
 impl Writeable for ClaimableHTLC {
        fn write<W: Writer>(&self, writer: &mut W) -> Result<(), io::Error> {
-               let payment_data = match &self.onion_payload {
-                       OnionPayload::Invoice { _legacy_hop_data } => Some(_legacy_hop_data),
-                       _ => None,
-               };
-               let keysend_preimage = match self.onion_payload {
-                       OnionPayload::Invoice { .. } => None,
-                       OnionPayload::Spontaneous(preimage) => Some(preimage.clone()),
+               let (payment_data, keysend_preimage) = match &self.onion_payload {
+                       OnionPayload::Invoice { _legacy_hop_data } => (_legacy_hop_data.as_ref(), None),
+                       OnionPayload::Spontaneous(preimage) => (None, Some(preimage)),
                };
                write_tlv_fields!(writer, {
                        (0, self.prev_hop, required),
@@ -6116,13 +6108,13 @@ impl Readable for ClaimableHTLC {
                                OnionPayload::Spontaneous(p)
                        },
                        None => {
-                               if payment_data.is_none() {
-                                       return Err(DecodeError::InvalidValue)
-                               }
                                if total_msat.is_none() {
+                                       if payment_data.is_none() {
+                                               return Err(DecodeError::InvalidValue)
+                                       }
                                        total_msat = Some(payment_data.as_ref().unwrap().total_msat);
                                }
-                               OnionPayload::Invoice { _legacy_hop_data: payment_data.unwrap() }
+                               OnionPayload::Invoice { _legacy_hop_data: payment_data }
                        },
                };
                Ok(Self {
@@ -6295,13 +6287,15 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> Writeable f
                        }
                }
 
+               let mut htlc_purposes: Vec<&events::PaymentPurpose> = Vec::new();
                (channel_state.claimable_htlcs.len() as u64).write(writer)?;
-               for (payment_hash, previous_hops) in channel_state.claimable_htlcs.iter() {
+               for (payment_hash, (purpose, previous_hops)) in channel_state.claimable_htlcs.iter() {
                        payment_hash.write(writer)?;
                        (previous_hops.len() as u64).write(writer)?;
                        for htlc in previous_hops.iter() {
                                htlc.write(writer)?;
                        }
+                       htlc_purposes.push(purpose);
                }
 
                let per_peer_state = self.per_peer_state.write().unwrap();
@@ -6378,6 +6372,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> Writeable f
                        (3, pending_outbound_payments, required),
                        (5, self.our_network_pubkey, required),
                        (7, self.fake_scid_rand_bytes, required),
+                       (9, htlc_purposes, vec_type),
                });
 
                Ok(())
@@ -6596,15 +6591,15 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
                }
 
                let claimable_htlcs_count: u64 = Readable::read(reader)?;
-               let mut claimable_htlcs = HashMap::with_capacity(cmp::min(claimable_htlcs_count as usize, 128));
+               let mut claimable_htlcs_list = Vec::with_capacity(cmp::min(claimable_htlcs_count as usize, 128));
                for _ in 0..claimable_htlcs_count {
                        let payment_hash = Readable::read(reader)?;
                        let previous_hops_len: u64 = Readable::read(reader)?;
                        let mut previous_hops = Vec::with_capacity(cmp::min(previous_hops_len as usize, MAX_ALLOC_SIZE/mem::size_of::<ClaimableHTLC>()));
                        for _ in 0..previous_hops_len {
-                               previous_hops.push(Readable::read(reader)?);
+                               previous_hops.push(<ClaimableHTLC as Readable>::read(reader)?);
                        }
-                       claimable_htlcs.insert(payment_hash, previous_hops);
+                       claimable_htlcs_list.push((payment_hash, previous_hops));
                }
 
                let peer_count: u64 = Readable::read(reader)?;
@@ -6674,11 +6669,13 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
                let mut pending_outbound_payments = None;
                let mut received_network_pubkey: Option<PublicKey> = None;
                let mut fake_scid_rand_bytes: Option<[u8; 32]> = None;
+               let mut claimable_htlc_purposes = None;
                read_tlv_fields!(reader, {
                        (1, pending_outbound_payments_no_retry, option),
                        (3, pending_outbound_payments, option),
                        (5, received_network_pubkey, option),
                        (7, fake_scid_rand_bytes, option),
+                       (9, claimable_htlc_purposes, vec_type),
                });
                if fake_scid_rand_bytes.is_none() {
                        fake_scid_rand_bytes = Some(args.keys_manager.get_secure_random_bytes());
@@ -6701,7 +6698,7 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
                        // payments which are still in-flight via their on-chain state.
                        // We only rebuild the pending payments map if we were most recently serialized by
                        // 0.0.102+
-                       for (_, monitor) in args.channel_monitors {
+                       for (_, monitor) in args.channel_monitors.iter() {
                                if by_id.get(&monitor.get_funding_txo().0.to_channel_id()).is_none() {
                                        for (htlc_source, htlc) in monitor.get_pending_outbound_htlcs() {
                                                if let HTLCSource::OutboundRoute { payment_id, session_priv, path, payment_secret, .. } = htlc_source {
@@ -6739,6 +6736,49 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
                        }
                }
 
+               let inbound_pmt_key_material = args.keys_manager.get_inbound_payment_key_material();
+               let expanded_inbound_key = inbound_payment::ExpandedKey::new(&inbound_pmt_key_material);
+
+               let mut claimable_htlcs = HashMap::with_capacity(claimable_htlcs_list.len());
+               if let Some(mut purposes) = claimable_htlc_purposes {
+                       if purposes.len() != claimable_htlcs_list.len() {
+                               return Err(DecodeError::InvalidValue);
+                       }
+                       for (purpose, (payment_hash, previous_hops)) in purposes.drain(..).zip(claimable_htlcs_list.drain(..)) {
+                               claimable_htlcs.insert(payment_hash, (purpose, previous_hops));
+                       }
+               } else {
+                       // LDK versions prior to 0.0.107 did not write a `pending_htlc_purposes`, but do
+                       // include a `_legacy_hop_data` in the `OnionPayload`.
+                       for (payment_hash, previous_hops) in claimable_htlcs_list.drain(..) {
+                               if previous_hops.is_empty() {
+                                       return Err(DecodeError::InvalidValue);
+                               }
+                               let purpose = match &previous_hops[0].onion_payload {
+                                       OnionPayload::Invoice { _legacy_hop_data } => {
+                                               if let Some(hop_data) = _legacy_hop_data {
+                                                       events::PaymentPurpose::InvoicePayment {
+                                                               payment_preimage: match pending_inbound_payments.get(&payment_hash) {
+                                                                       Some(inbound_payment) => inbound_payment.payment_preimage,
+                                                                       None => match inbound_payment::verify(payment_hash, &hop_data, 0, &expanded_inbound_key, &args.logger) {
+                                                                               Ok(payment_preimage) => payment_preimage,
+                                                                               Err(()) => {
+                                                                                       log_error!(args.logger, "Failed to read claimable payment data for HTLC with payment hash {} - was not a pending inbound payment and didn't match our payment key", log_bytes!(payment_hash.0));
+                                                                                       return Err(DecodeError::InvalidValue);
+                                                                               }
+                                                                       }
+                                                               },
+                                                               payment_secret: hop_data.payment_secret,
+                                                       }
+                                               } else { return Err(DecodeError::InvalidValue); }
+                                       },
+                                       OnionPayload::Spontaneous(payment_preimage) =>
+                                               events::PaymentPurpose::SpontaneousPayment(*payment_preimage),
+                               };
+                               claimable_htlcs.insert(payment_hash, (purpose, previous_hops));
+                       }
+               }
+
                let mut secp_ctx = Secp256k1::new();
                secp_ctx.seeded_randomize(&args.keys_manager.get_secure_random_bytes());
 
@@ -6784,8 +6824,38 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
                        }
                }
 
-               let inbound_pmt_key_material = args.keys_manager.get_inbound_payment_key_material();
-               let expanded_inbound_key = inbound_payment::ExpandedKey::new(&inbound_pmt_key_material);
+               for (_, monitor) in args.channel_monitors.iter() {
+                       for (payment_hash, payment_preimage) in monitor.get_stored_preimages() {
+                               if let Some(claimable_htlcs) = claimable_htlcs.remove(&payment_hash) {
+                                       log_info!(args.logger, "Re-claimaing HTLCs with payment hash {} due to partial-claim.", log_bytes!(payment_hash.0));
+                                       for claimable_htlc in claimable_htlcs.1 {
+                                               // Add a holding-cell claim of the payment to the Channel, which should be
+                                               // applied ~immediately on peer reconnection. Because it won't generate a
+                                               // new commitment transaction we can just provide the payment preimage to
+                                               // the corresponding ChannelMonitor and nothing else.
+                                               //
+                                               // We do so directly instead of via the normal ChannelMonitor update
+                                               // procedure as the ChainMonitor hasn't yet been initialized, implying
+                                               // we're not allowed to call it directly yet. Further, we do the update
+                                               // without incrementing the ChannelMonitor update ID as there isn't any
+                                               // reason to.
+                                               // If we were to generate a new ChannelMonitor update ID here and then
+                                               // crash before the user finishes block connect we'd end up force-closing
+                                               // this channel as well. On the flip side, there's no harm in restarting
+                                               // without the new monitor persisted - we'll end up right back here on
+                                               // restart.
+                                               let previous_channel_id = claimable_htlc.prev_hop.outpoint.to_channel_id();
+                                               if let Some(channel) = by_id.get_mut(&previous_channel_id) {
+                                                       channel.claim_htlc_while_disconnected_dropping_mon_update(claimable_htlc.prev_hop.htlc_id, payment_preimage, &args.logger);
+                                               }
+                                               if let Some(previous_hop_monitor) = args.channel_monitors.get(&claimable_htlc.prev_hop.outpoint) {
+                                                       previous_hop_monitor.provide_payment_preimage(&payment_hash, &payment_preimage, &args.tx_broadcaster, &args.fee_estimator, &args.logger);
+                                               }
+                                       }
+                               }
+                       }
+               }
+
                let channel_manager = ChannelManager {
                        genesis_hash,
                        fee_estimator: args.fee_estimator,
@@ -7424,7 +7494,7 @@ pub mod bench {
                        tx = Transaction { version: 2, lock_time: 0, input: Vec::new(), output: vec![TxOut {
                                value: 8_000_000, script_pubkey: output_script,
                        }]};
-                       node_a.funding_transaction_generated(&temporary_channel_id, tx.clone()).unwrap();
+                       node_a.funding_transaction_generated(&temporary_channel_id, &node_b.get_our_node_id(), tx.clone()).unwrap();
                } else { panic!(); }
 
                node_b.handle_funding_created(&node_a.get_our_node_id(), &get_event_msg!(node_a_holder, MessageSendEvent::SendFundingCreated, node_b.get_our_node_id()));