Merge pull request #1897 from TheBlueMatt/2022-11-monitor-updates-always-async
authorMatt Corallo <649246+TheBlueMatt@users.noreply.github.com>
Wed, 22 Feb 2023 19:12:31 +0000 (19:12 +0000)
committerGitHub <noreply@github.com>
Wed, 22 Feb 2023 19:12:31 +0000 (19:12 +0000)
Always process `ChannelMonitorUpdate`s asynchronously

fuzz/src/chanmon_consistency.rs
lightning/src/chain/chainmonitor.rs
lightning/src/ln/chanmon_update_fail_tests.rs
lightning/src/ln/channel.rs
lightning/src/ln/channelmanager.rs
lightning/src/ln/functional_tests.rs
lightning/src/ln/payment_tests.rs
lightning/src/util/ser.rs

index 06d03fd0e9b28d5cb10e71eb87b8a7c5a02257a7..4386302f845b0d95279bbba99c975dede237df8a 100644 (file)
@@ -295,7 +295,7 @@ fn check_api_err(api_err: APIError) {
                        // all others. If you hit this panic, the list of acceptable errors
                        // is probably just stale and you should add new messages here.
                        match err.as_str() {
-                               "Peer for first hop currently disconnected/pending monitor update!" => {},
+                               "Peer for first hop currently disconnected" => {},
                                _ if err.starts_with("Cannot push more than their max accepted HTLCs ") => {},
                                _ if err.starts_with("Cannot send value that would put us over the max HTLC value in flight our peer will accept ") => {},
                                _ if err.starts_with("Cannot send value that would put our balance under counterparty-announced channel reserve value") => {},
index 3a2077209c4ea36ebf3b412a048f845e1b344c1f..9a74f891b1e985b371f2640ccf23f7c992e626ec 100644 (file)
@@ -796,7 +796,7 @@ mod tests {
        use crate::ln::functional_test_utils::*;
        use crate::ln::msgs::ChannelMessageHandler;
        use crate::util::errors::APIError;
-       use crate::util::events::{ClosureReason, MessageSendEvent, MessageSendEventsProvider};
+       use crate::util::events::{Event, ClosureReason, MessageSendEvent, MessageSendEventsProvider};
 
        #[test]
        fn test_async_ooo_offchain_updates() {
@@ -819,10 +819,8 @@ mod tests {
 
                nodes[1].node.claim_funds(payment_preimage_1);
                check_added_monitors!(nodes[1], 1);
-               expect_payment_claimed!(nodes[1], payment_hash_1, 1_000_000);
                nodes[1].node.claim_funds(payment_preimage_2);
                check_added_monitors!(nodes[1], 1);
-               expect_payment_claimed!(nodes[1], payment_hash_2, 1_000_000);
 
                let persistences = chanmon_cfgs[1].persister.offchain_monitor_updates.lock().unwrap().clone();
                assert_eq!(persistences.len(), 1);
@@ -850,8 +848,24 @@ mod tests {
                        .find(|(txo, _)| txo == funding_txo).unwrap().1.contains(&next_update));
                assert!(nodes[1].chain_monitor.release_pending_monitor_events().is_empty());
                assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
+               assert!(nodes[1].node.get_and_clear_pending_events().is_empty());
                nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(*funding_txo, update_iter.next().unwrap().clone()).unwrap();
 
+               let claim_events = nodes[1].node.get_and_clear_pending_events();
+               assert_eq!(claim_events.len(), 2);
+               match claim_events[0] {
+                       Event::PaymentClaimed { ref payment_hash, amount_msat: 1_000_000, .. } => {
+                               assert_eq!(payment_hash_1, *payment_hash);
+                       },
+                       _ => panic!("Unexpected event"),
+               }
+               match claim_events[1] {
+                       Event::PaymentClaimed { ref payment_hash, amount_msat: 1_000_000, .. } => {
+                               assert_eq!(payment_hash_2, *payment_hash);
+                       },
+                       _ => panic!("Unexpected event"),
+               }
+
                // Now manually walk the commitment signed dance - because we claimed two payments
                // back-to-back it doesn't fit into the neat walk commitment_signed_dance does.
 
index abc4f4a140707cc4792b20131a96eb83d2d09e9d..1532884fa605c5ba363720aa6701b602c83d92a2 100644 (file)
@@ -143,7 +143,7 @@ fn test_monitor_and_persister_update_fail() {
                let mut node_0_per_peer_lock;
                let mut node_0_peer_state_lock;
                let mut channel = get_channel_ref!(nodes[0], nodes[1], node_0_per_peer_lock, node_0_peer_state_lock, chan.2);
-               if let Ok((_, _, update)) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].logger) {
+               if let Ok(update) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].logger) {
                        // Check that even though the persister is returning a InProgress,
                        // because the update is bogus, ultimately the error that's returned
                        // should be a PermanentFailure.
@@ -1602,7 +1602,6 @@ fn test_monitor_update_fail_claim() {
 
        chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress);
        nodes[1].node.claim_funds(payment_preimage_1);
-       expect_payment_claimed!(nodes[1], payment_hash_1, 1_000_000);
        assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
        check_added_monitors!(nodes[1], 1);
 
@@ -1628,6 +1627,7 @@ fn test_monitor_update_fail_claim() {
        let events = nodes[1].node.get_and_clear_pending_msg_events();
        assert_eq!(events.len(), 0);
        commitment_signed_dance!(nodes[1], nodes[2], payment_event.commitment_msg, false, true);
+       expect_pending_htlcs_forwardable_ignore!(nodes[1]);
 
        let (_, payment_hash_3, payment_secret_3) = get_payment_preimage_hash!(nodes[0]);
        nodes[2].node.send_payment(&route, payment_hash_3, &Some(payment_secret_3), PaymentId(payment_hash_3.0)).unwrap();
@@ -1645,6 +1645,7 @@ fn test_monitor_update_fail_claim() {
        let channel_id = chan_1.2;
        let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
        nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update);
+       expect_payment_claimed!(nodes[1], payment_hash_1, 1_000_000);
        check_added_monitors!(nodes[1], 0);
 
        let bs_fulfill_update = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id());
@@ -1653,7 +1654,7 @@ fn test_monitor_update_fail_claim() {
        expect_payment_sent!(nodes[0], payment_preimage_1);
 
        // Get the payment forwards, note that they were batched into one commitment update.
-       expect_pending_htlcs_forwardable!(nodes[1]);
+       nodes[1].node.process_pending_htlc_forwards();
        check_added_monitors!(nodes[1], 1);
        let bs_forward_update = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id());
        nodes[0].node.handle_update_add_htlc(&nodes[1].node.get_our_node_id(), &bs_forward_update.update_add_htlcs[0]);
@@ -1739,7 +1740,6 @@ fn test_monitor_update_on_pending_forwards() {
        chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress);
        expect_pending_htlcs_forwardable_and_htlc_handling_failed!(nodes[1], vec![HTLCDestination::NextHopChannel { node_id: Some(nodes[2].node.get_our_node_id()), channel_id: chan_2.2 }]);
        check_added_monitors!(nodes[1], 1);
-       assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
 
        chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed);
        let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_1.2).unwrap().clone();
@@ -1753,17 +1753,17 @@ fn test_monitor_update_on_pending_forwards() {
 
        let events = nodes[0].node.get_and_clear_pending_events();
        assert_eq!(events.len(), 3);
-       if let Event::PaymentPathFailed { payment_hash, payment_failed_permanently, .. } = events[0] {
+       if let Event::PaymentPathFailed { payment_hash, payment_failed_permanently, .. } = events[1] {
                assert_eq!(payment_hash, payment_hash_1);
                assert!(payment_failed_permanently);
        } else { panic!("Unexpected event!"); }
-       match events[1] {
+       match events[2] {
                Event::PaymentFailed { payment_hash, .. } => {
                        assert_eq!(payment_hash, payment_hash_1);
                },
                _ => panic!("Unexpected event"),
        }
-       match events[2] {
+       match events[0] {
                Event::PendingHTLCsForwardable { .. } => { },
                _ => panic!("Unexpected event"),
        };
@@ -1803,7 +1803,6 @@ fn monitor_update_claim_fail_no_response() {
 
        chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress);
        nodes[1].node.claim_funds(payment_preimage_1);
-       expect_payment_claimed!(nodes[1], payment_hash_1, 1_000_000);
        check_added_monitors!(nodes[1], 1);
 
        assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
@@ -1811,6 +1810,7 @@ fn monitor_update_claim_fail_no_response() {
        chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed);
        let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
        nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update);
+       expect_payment_claimed!(nodes[1], payment_hash_1, 1_000_000);
        check_added_monitors!(nodes[1], 0);
        assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
 
@@ -2290,7 +2290,6 @@ fn do_channel_holding_cell_serialize(disconnect: bool, reload_a: bool) {
        chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress);
        nodes[0].node.claim_funds(payment_preimage_0);
        check_added_monitors!(nodes[0], 1);
-       expect_payment_claimed!(nodes[0], payment_hash_0, 100_000);
 
        nodes[1].node.handle_update_add_htlc(&nodes[0].node.get_our_node_id(), &send.msgs[0]);
        nodes[1].node.handle_commitment_signed(&nodes[0].node.get_our_node_id(), &send.commitment_msg);
@@ -2353,6 +2352,7 @@ fn do_channel_holding_cell_serialize(disconnect: bool, reload_a: bool) {
        chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed);
        let (funding_txo, mon_id, _) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_id).unwrap().clone();
        nodes[0].chain_monitor.chain_monitor.force_channel_monitor_updated(funding_txo, mon_id);
+       expect_payment_claimed!(nodes[0], payment_hash_0, 100_000);
 
        // New outbound messages should be generated immediately upon a call to
        // get_and_clear_pending_msg_events (but not before).
@@ -2606,7 +2606,15 @@ fn test_permanent_error_during_sending_shutdown() {
        chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::PermanentFailure);
 
        assert!(nodes[0].node.close_channel(&channel_id, &nodes[1].node.get_our_node_id()).is_ok());
-       check_closed_broadcast!(nodes[0], true);
+
+       // We always send the `shutdown` response when initiating a shutdown, even if we immediately
+       // close the channel thereafter.
+       let msg_events = nodes[0].node.get_and_clear_pending_msg_events();
+       assert_eq!(msg_events.len(), 3);
+       if let MessageSendEvent::SendShutdown { .. } = msg_events[0] {} else { panic!(); }
+       if let MessageSendEvent::BroadcastChannelUpdate { .. } = msg_events[1] {} else { panic!(); }
+       if let MessageSendEvent::HandleError { .. } =  msg_events[2] {} else { panic!(); }
+
        check_added_monitors!(nodes[0], 2);
        check_closed_event!(nodes[0], 1, ClosureReason::ProcessingError { err: "ChannelMonitor storage failure".to_string() });
 }
@@ -2629,7 +2637,15 @@ fn test_permanent_error_during_handling_shutdown() {
        assert!(nodes[0].node.close_channel(&channel_id, &nodes[1].node.get_our_node_id()).is_ok());
        let shutdown = get_event_msg!(nodes[0], MessageSendEvent::SendShutdown, nodes[1].node.get_our_node_id());
        nodes[1].node.handle_shutdown(&nodes[0].node.get_our_node_id(), &shutdown);
-       check_closed_broadcast!(nodes[1], true);
+
+       // We always send the `shutdown` response when receiving a shutdown, even if we immediately
+       // close the channel thereafter.
+       let msg_events = nodes[1].node.get_and_clear_pending_msg_events();
+       assert_eq!(msg_events.len(), 3);
+       if let MessageSendEvent::SendShutdown { .. } = msg_events[0] {} else { panic!(); }
+       if let MessageSendEvent::BroadcastChannelUpdate { .. } = msg_events[1] {} else { panic!(); }
+       if let MessageSendEvent::HandleError { .. } =  msg_events[2] {} else { panic!(); }
+
        check_added_monitors!(nodes[1], 2);
        check_closed_event!(nodes[1], 1, ClosureReason::ProcessingError { err: "ChannelMonitor storage failure".to_string() });
 }
@@ -2651,7 +2667,6 @@ fn double_temp_error() {
        // `claim_funds` results in a ChannelMonitorUpdate.
        nodes[1].node.claim_funds(payment_preimage_1);
        check_added_monitors!(nodes[1], 1);
-       expect_payment_claimed!(nodes[1], payment_hash_1, 1_000_000);
        let (funding_tx, latest_update_1, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
 
        chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress);
@@ -2659,7 +2674,6 @@ fn double_temp_error() {
        // which had some asserts that prevented it from being called twice.
        nodes[1].node.claim_funds(payment_preimage_2);
        check_added_monitors!(nodes[1], 1);
-       expect_payment_claimed!(nodes[1], payment_hash_2, 1_000_000);
        chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed);
 
        let (_, latest_update_2, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
@@ -2668,11 +2682,24 @@ fn double_temp_error() {
        check_added_monitors!(nodes[1], 0);
        nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(funding_tx, latest_update_2);
 
-       // Complete the first HTLC.
-       let events = nodes[1].node.get_and_clear_pending_msg_events();
-       assert_eq!(events.len(), 1);
+       // Complete the first HTLC. Note that as a side-effect we handle the monitor update completions
+       // and get both PaymentClaimed events at once.
+       let msg_events = nodes[1].node.get_and_clear_pending_msg_events();
+
+       let events = nodes[1].node.get_and_clear_pending_events();
+       assert_eq!(events.len(), 2);
+       match events[0] {
+               Event::PaymentClaimed { amount_msat: 1_000_000, payment_hash, .. } => assert_eq!(payment_hash, payment_hash_1),
+               _ => panic!("Unexpected Event: {:?}", events[0]),
+       }
+       match events[1] {
+               Event::PaymentClaimed { amount_msat: 1_000_000, payment_hash, .. } => assert_eq!(payment_hash, payment_hash_2),
+               _ => panic!("Unexpected Event: {:?}", events[1]),
+       }
+
+       assert_eq!(msg_events.len(), 1);
        let (update_fulfill_1, commitment_signed_b1, node_id) = {
-               match &events[0] {
+               match &msg_events[0] {
                        &MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
                                assert!(update_add_htlcs.is_empty());
                                assert_eq!(update_fulfill_htlcs.len(), 1);
index 62a2b7b51e52b723231fec7bec63f088c4c793a2..e3641a6204a96d6685098a86ba6fc9d67309e15d 100644 (file)
@@ -393,35 +393,21 @@ enum UpdateFulfillFetch {
 }
 
 /// The return type of get_update_fulfill_htlc_and_commit.
-pub enum UpdateFulfillCommitFetch {
+pub enum UpdateFulfillCommitFetch<'a> {
        /// Indicates the HTLC fulfill is new, and either generated an update_fulfill message, placed
        /// it in the holding cell, or re-generated the update_fulfill message after the same claim was
        /// previously placed in the holding cell (and has since been removed).
        NewClaim {
                /// The ChannelMonitorUpdate which places the new payment preimage in the channel monitor
-               monitor_update: ChannelMonitorUpdate,
+               monitor_update: &'a ChannelMonitorUpdate,
                /// The value of the HTLC which was claimed, in msat.
                htlc_value_msat: u64,
-               /// The update_fulfill message and commitment_signed message (if the claim was not placed
-               /// in the holding cell).
-               msgs: Option<(msgs::UpdateFulfillHTLC, msgs::CommitmentSigned)>,
        },
        /// Indicates the HTLC fulfill is duplicative and already existed either in the holding cell
        /// or has been forgotten (presumably previously claimed).
        DuplicateClaim {},
 }
 
-/// The return value of `revoke_and_ack` on success, primarily updates to other channels or HTLC
-/// state.
-pub(super) struct RAAUpdates {
-       pub commitment_update: Option<msgs::CommitmentUpdate>,
-       pub accepted_htlcs: Vec<(PendingHTLCInfo, u64)>,
-       pub failed_htlcs: Vec<(HTLCSource, PaymentHash, HTLCFailReason)>,
-       pub finalized_claimed_htlcs: Vec<HTLCSource>,
-       pub monitor_update: ChannelMonitorUpdate,
-       pub holding_cell_failed_htlcs: Vec<(HTLCSource, PaymentHash)>,
-}
-
 /// The return value of `monitor_updating_restored`
 pub(super) struct MonitorRestoreUpdates {
        pub raa: Option<msgs::RevokeAndACK>,
@@ -558,6 +544,11 @@ pub(super) struct Channel<Signer: ChannelSigner> {
        monitor_pending_channel_ready: bool,
        monitor_pending_revoke_and_ack: bool,
        monitor_pending_commitment_signed: bool,
+
+       // TODO: If a channel is drop'd, we don't know whether the `ChannelMonitor` is ultimately
+       // responsible for some of the HTLCs here or not - we don't know whether the update in question
+       // completed or not. We currently ignore these fields entirely when force-closing a channel,
+       // but need to handle this somehow or we run the risk of losing HTLCs!
        monitor_pending_forwards: Vec<(PendingHTLCInfo, u64)>,
        monitor_pending_failures: Vec<(HTLCSource, PaymentHash, HTLCFailReason)>,
        monitor_pending_finalized_fulfills: Vec<HTLCSource>,
@@ -743,6 +734,12 @@ pub(super) struct Channel<Signer: ChannelSigner> {
        /// The unique identifier used to re-derive the private key material for the channel through
        /// [`SignerProvider::derive_channel_signer`].
        channel_keys_id: [u8; 32],
+
+       /// When we generate [`ChannelMonitorUpdate`]s to persist, they may not be persisted immediately.
+       /// If we then persist the [`channelmanager::ChannelManager`] and crash before the persistence
+       /// completes we still need to be able to complete the persistence. Thus, we have to keep a
+       /// copy of the [`ChannelMonitorUpdate`] here until it is complete.
+       pending_monitor_updates: Vec<ChannelMonitorUpdate>,
 }
 
 #[cfg(any(test, fuzzing))]
@@ -1112,6 +1109,8 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
 
                        channel_type,
                        channel_keys_id,
+
+                       pending_monitor_updates: Vec::new(),
                })
        }
 
@@ -1458,6 +1457,8 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
 
                        channel_type,
                        channel_keys_id,
+
+                       pending_monitor_updates: Vec::new(),
                };
 
                Ok(chan)
@@ -1964,22 +1965,30 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
                }
        }
 
-       pub fn get_update_fulfill_htlc_and_commit<L: Deref>(&mut self, htlc_id: u64, payment_preimage: PaymentPreimage, logger: &L) -> Result<UpdateFulfillCommitFetch, (ChannelError, ChannelMonitorUpdate)> where L::Target: Logger {
+       pub fn get_update_fulfill_htlc_and_commit<L: Deref>(&mut self, htlc_id: u64, payment_preimage: PaymentPreimage, logger: &L) -> UpdateFulfillCommitFetch where L::Target: Logger {
                match self.get_update_fulfill_htlc(htlc_id, payment_preimage, logger) {
-                       UpdateFulfillFetch::NewClaim { mut monitor_update, htlc_value_msat, msg: Some(update_fulfill_htlc) } => {
-                               let (commitment, mut additional_update) = match self.send_commitment_no_status_check(logger) {
-                                       Err(e) => return Err((e, monitor_update)),
-                                       Ok(res) => res
-                               };
-                               // send_commitment_no_status_check may bump latest_monitor_id but we want them to be
+                       UpdateFulfillFetch::NewClaim { mut monitor_update, htlc_value_msat, msg: Some(_) } => {
+                               let mut additional_update = self.build_commitment_no_status_check(logger);
+                               // build_commitment_no_status_check may bump latest_monitor_id but we want them to be
                                // strictly increasing by one, so decrement it here.
                                self.latest_monitor_update_id = monitor_update.update_id;
                                monitor_update.updates.append(&mut additional_update.updates);
-                               Ok(UpdateFulfillCommitFetch::NewClaim { monitor_update, htlc_value_msat, msgs: Some((update_fulfill_htlc, commitment)) })
+                               self.monitor_updating_paused(false, true, false, Vec::new(), Vec::new(), Vec::new());
+                               self.pending_monitor_updates.push(monitor_update);
+                               UpdateFulfillCommitFetch::NewClaim {
+                                       monitor_update: self.pending_monitor_updates.last().unwrap(),
+                                       htlc_value_msat,
+                               }
                        },
-                       UpdateFulfillFetch::NewClaim { monitor_update, htlc_value_msat, msg: None } =>
-                               Ok(UpdateFulfillCommitFetch::NewClaim { monitor_update, htlc_value_msat, msgs: None }),
-                       UpdateFulfillFetch::DuplicateClaim {} => Ok(UpdateFulfillCommitFetch::DuplicateClaim {}),
+                       UpdateFulfillFetch::NewClaim { monitor_update, htlc_value_msat, msg: None } => {
+                               self.monitor_updating_paused(false, false, false, Vec::new(), Vec::new(), Vec::new());
+                               self.pending_monitor_updates.push(monitor_update);
+                               UpdateFulfillCommitFetch::NewClaim {
+                                       monitor_update: self.pending_monitor_updates.last().unwrap(),
+                                       htlc_value_msat,
+                               }
+                       }
+                       UpdateFulfillFetch::DuplicateClaim {} => UpdateFulfillCommitFetch::DuplicateClaim {},
                }
        }
 
@@ -2259,9 +2268,9 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
 
        pub fn funding_created<SP: Deref, L: Deref>(
                &mut self, msg: &msgs::FundingCreated, best_block: BestBlock, signer_provider: &SP, logger: &L
-       ) -> Result<(msgs::FundingSigned, ChannelMonitor<<SP::Target as SignerProvider>::Signer>, Option<msgs::ChannelReady>), ChannelError>
+       ) -> Result<(msgs::FundingSigned, ChannelMonitor<Signer>), ChannelError>
        where
-               SP::Target: SignerProvider,
+               SP::Target: SignerProvider<Signer = Signer>,
                L::Target: Logger
        {
                if self.is_outbound() {
@@ -2337,19 +2346,22 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
 
                log_info!(logger, "Generated funding_signed for peer for channel {}", log_bytes!(self.channel_id()));
 
+               let need_channel_ready = self.check_get_channel_ready(0).is_some();
+               self.monitor_updating_paused(false, false, need_channel_ready, Vec::new(), Vec::new(), Vec::new());
+
                Ok((msgs::FundingSigned {
                        channel_id: self.channel_id,
                        signature
-               }, channel_monitor, self.check_get_channel_ready(0)))
+               }, channel_monitor))
        }
 
        /// Handles a funding_signed message from the remote end.
        /// If this call is successful, broadcast the funding transaction (and not before!)
        pub fn funding_signed<SP: Deref, L: Deref>(
                &mut self, msg: &msgs::FundingSigned, best_block: BestBlock, signer_provider: &SP, logger: &L
-       ) -> Result<(ChannelMonitor<<SP::Target as SignerProvider>::Signer>, Transaction, Option<msgs::ChannelReady>), ChannelError>
+       ) -> Result<ChannelMonitor<Signer>, ChannelError>
        where
-               SP::Target: SignerProvider,
+               SP::Target: SignerProvider<Signer = Signer>,
                L::Target: Logger
        {
                if !self.is_outbound() {
@@ -2422,7 +2434,9 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
 
                log_info!(logger, "Received funding_signed from peer for channel {}", log_bytes!(self.channel_id()));
 
-               Ok((channel_monitor, self.funding_transaction.as_ref().cloned().unwrap(), self.check_get_channel_ready(0)))
+               let need_channel_ready = self.check_get_channel_ready(0).is_some();
+               self.monitor_updating_paused(false, false, need_channel_ready, Vec::new(), Vec::new(), Vec::new());
+               Ok(channel_monitor)
        }
 
        /// Handles a channel_ready message from our peer. If we've already sent our channel_ready
@@ -3034,17 +3048,17 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
                Ok(())
        }
 
-       pub fn commitment_signed<L: Deref>(&mut self, msg: &msgs::CommitmentSigned, logger: &L) -> Result<(msgs::RevokeAndACK, Option<msgs::CommitmentSigned>, ChannelMonitorUpdate), (Option<ChannelMonitorUpdate>, ChannelError)>
+       pub fn commitment_signed<L: Deref>(&mut self, msg: &msgs::CommitmentSigned, logger: &L) -> Result<&ChannelMonitorUpdate, ChannelError>
                where L::Target: Logger
        {
                if (self.channel_state & (ChannelState::ChannelReady as u32)) != (ChannelState::ChannelReady as u32) {
-                       return Err((None, ChannelError::Close("Got commitment signed message when channel was not in an operational state".to_owned())));
+                       return Err(ChannelError::Close("Got commitment signed message when channel was not in an operational state".to_owned()));
                }
                if self.channel_state & (ChannelState::PeerDisconnected as u32) == ChannelState::PeerDisconnected as u32 {
-                       return Err((None, ChannelError::Close("Peer sent commitment_signed when we needed a channel_reestablish".to_owned())));
+                       return Err(ChannelError::Close("Peer sent commitment_signed when we needed a channel_reestablish".to_owned()));
                }
                if self.channel_state & BOTH_SIDES_SHUTDOWN_MASK == BOTH_SIDES_SHUTDOWN_MASK && self.last_sent_closing_fee.is_some() {
-                       return Err((None, ChannelError::Close("Peer sent commitment_signed after we'd started exchanging closing_signeds".to_owned())));
+                       return Err(ChannelError::Close("Peer sent commitment_signed after we'd started exchanging closing_signeds".to_owned()));
                }
 
                let funding_script = self.get_funding_redeemscript();
@@ -3062,7 +3076,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
                                log_bytes!(self.counterparty_funding_pubkey().serialize()), encode::serialize_hex(&bitcoin_tx.transaction),
                                log_bytes!(sighash[..]), encode::serialize_hex(&funding_script), log_bytes!(self.channel_id()));
                        if let Err(_) = self.secp_ctx.verify_ecdsa(&sighash, &msg.signature, &self.counterparty_funding_pubkey()) {
-                               return Err((None, ChannelError::Close("Invalid commitment tx signature from peer".to_owned())));
+                               return Err(ChannelError::Close("Invalid commitment tx signature from peer".to_owned()));
                        }
                        bitcoin_tx.txid
                };
@@ -3077,7 +3091,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
                        debug_assert!(!self.is_outbound());
                        let counterparty_reserve_we_require_msat = self.holder_selected_channel_reserve_satoshis * 1000;
                        if commitment_stats.remote_balance_msat < commitment_stats.total_fee_sat * 1000 + counterparty_reserve_we_require_msat {
-                               return Err((None, ChannelError::Close("Funding remote cannot afford proposed new fee".to_owned())));
+                               return Err(ChannelError::Close("Funding remote cannot afford proposed new fee".to_owned()));
                        }
                }
                #[cfg(any(test, fuzzing))]
@@ -3099,7 +3113,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
                }
 
                if msg.htlc_signatures.len() != commitment_stats.num_nondust_htlcs {
-                       return Err((None, ChannelError::Close(format!("Got wrong number of HTLC signatures ({}) from remote. It must be {}", msg.htlc_signatures.len(), commitment_stats.num_nondust_htlcs))));
+                       return Err(ChannelError::Close(format!("Got wrong number of HTLC signatures ({}) from remote. It must be {}", msg.htlc_signatures.len(), commitment_stats.num_nondust_htlcs)));
                }
 
                // TODO: Sadly, we pass HTLCs twice to ChannelMonitor: once via the HolderCommitmentTransaction and once via the update
@@ -3117,7 +3131,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
                                        log_bytes!(msg.htlc_signatures[idx].serialize_compact()[..]), log_bytes!(keys.countersignatory_htlc_key.serialize()),
                                        encode::serialize_hex(&htlc_tx), log_bytes!(htlc_sighash[..]), encode::serialize_hex(&htlc_redeemscript), log_bytes!(self.channel_id()));
                                if let Err(_) = self.secp_ctx.verify_ecdsa(&htlc_sighash, &msg.htlc_signatures[idx], &keys.countersignatory_htlc_key) {
-                                       return Err((None, ChannelError::Close("Invalid HTLC tx signature from peer".to_owned())));
+                                       return Err(ChannelError::Close("Invalid HTLC tx signature from peer".to_owned()));
                                }
                                htlcs_and_sigs.push((htlc, Some(msg.htlc_signatures[idx]), source));
                        } else {
@@ -3133,10 +3147,8 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
                        self.counterparty_funding_pubkey()
                );
 
-               let next_per_commitment_point = self.holder_signer.get_per_commitment_point(self.cur_holder_commitment_transaction_number - 1, &self.secp_ctx);
                self.holder_signer.validate_holder_commitment(&holder_commitment_tx, commitment_stats.preimages)
-                       .map_err(|_| (None, ChannelError::Close("Failed to validate our commitment".to_owned())))?;
-               let per_commitment_secret = self.holder_signer.release_commitment_secret(self.cur_holder_commitment_transaction_number + 1);
+                       .map_err(|_| ChannelError::Close("Failed to validate our commitment".to_owned()))?;
 
                // Update state now that we've passed all the can-fail calls...
                let mut need_commitment = false;
@@ -3181,7 +3193,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
 
                self.cur_holder_commitment_transaction_number -= 1;
                // Note that if we need_commitment & !AwaitingRemoteRevoke we'll call
-               // send_commitment_no_status_check() next which will reset this to RAAFirst.
+               // build_commitment_no_status_check() next which will reset this to RAAFirst.
                self.resend_order = RAACommitmentOrder::CommitmentFirst;
 
                if (self.channel_state & ChannelState::MonitorUpdateInProgress as u32) != 0 {
@@ -3193,52 +3205,50 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
                                // the corresponding HTLC status updates so that get_last_commitment_update
                                // includes the right HTLCs.
                                self.monitor_pending_commitment_signed = true;
-                               let (_, mut additional_update) = self.send_commitment_no_status_check(logger).map_err(|e| (None, e))?;
-                               // send_commitment_no_status_check may bump latest_monitor_id but we want them to be
+                               let mut additional_update = self.build_commitment_no_status_check(logger);
+                               // build_commitment_no_status_check may bump latest_monitor_id but we want them to be
                                // strictly increasing by one, so decrement it here.
                                self.latest_monitor_update_id = monitor_update.update_id;
                                monitor_update.updates.append(&mut additional_update.updates);
                        }
                        log_debug!(logger, "Received valid commitment_signed from peer in channel {}, updated HTLC state but awaiting a monitor update resolution to reply.",
                                log_bytes!(self.channel_id));
-                       return Err((Some(monitor_update), ChannelError::Ignore("Previous monitor update failure prevented generation of RAA".to_owned())));
+                       self.pending_monitor_updates.push(monitor_update);
+                       return Ok(self.pending_monitor_updates.last().unwrap());
                }
 
-               let commitment_signed = if need_commitment && (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32)) == 0 {
+               let need_commitment_signed = if need_commitment && (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32)) == 0 {
                        // If we're AwaitingRemoteRevoke we can't send a new commitment here, but that's ok -
                        // we'll send one right away when we get the revoke_and_ack when we
                        // free_holding_cell_htlcs().
-                       let (msg, mut additional_update) = self.send_commitment_no_status_check(logger).map_err(|e| (None, e))?;
-                       // send_commitment_no_status_check may bump latest_monitor_id but we want them to be
+                       let mut additional_update = self.build_commitment_no_status_check(logger);
+                       // build_commitment_no_status_check may bump latest_monitor_id but we want them to be
                        // strictly increasing by one, so decrement it here.
                        self.latest_monitor_update_id = monitor_update.update_id;
                        monitor_update.updates.append(&mut additional_update.updates);
-                       Some(msg)
-               } else { None };
+                       true
+               } else { false };
 
                log_debug!(logger, "Received valid commitment_signed from peer in channel {}, updating HTLC state and responding with{} a revoke_and_ack.",
-                       log_bytes!(self.channel_id()), if commitment_signed.is_some() { " our own commitment_signed and" } else { "" });
-
-               Ok((msgs::RevokeAndACK {
-                       channel_id: self.channel_id,
-                       per_commitment_secret,
-                       next_per_commitment_point,
-               }, commitment_signed, monitor_update))
+                       log_bytes!(self.channel_id()), if need_commitment_signed { " our own commitment_signed and" } else { "" });
+               self.pending_monitor_updates.push(monitor_update);
+               self.monitor_updating_paused(true, need_commitment_signed, false, Vec::new(), Vec::new(), Vec::new());
+               return Ok(self.pending_monitor_updates.last().unwrap());
        }
 
        /// Public version of the below, checking relevant preconditions first.
        /// If we're not in a state where freeing the holding cell makes sense, this is a no-op and
        /// returns `(None, Vec::new())`.
-       pub fn maybe_free_holding_cell_htlcs<L: Deref>(&mut self, logger: &L) -> Result<(Option<(msgs::CommitmentUpdate, ChannelMonitorUpdate)>, Vec<(HTLCSource, PaymentHash)>), ChannelError> where L::Target: Logger {
+       pub fn maybe_free_holding_cell_htlcs<L: Deref>(&mut self, logger: &L) -> (Option<&ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>) where L::Target: Logger {
                if self.channel_state >= ChannelState::ChannelReady as u32 &&
                   (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32 | ChannelState::PeerDisconnected as u32 | ChannelState::MonitorUpdateInProgress as u32)) == 0 {
                        self.free_holding_cell_htlcs(logger)
-               } else { Ok((None, Vec::new())) }
+               } else { (None, Vec::new()) }
        }
 
        /// Frees any pending commitment updates in the holding cell, generating the relevant messages
        /// for our counterparty.
-       fn free_holding_cell_htlcs<L: Deref>(&mut self, logger: &L) -> Result<(Option<(msgs::CommitmentUpdate, ChannelMonitorUpdate)>, Vec<(HTLCSource, PaymentHash)>), ChannelError> where L::Target: Logger {
+       fn free_holding_cell_htlcs<L: Deref>(&mut self, logger: &L) -> (Option<&ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>) where L::Target: Logger {
                assert_eq!(self.channel_state & ChannelState::MonitorUpdateInProgress as u32, 0);
                if self.holding_cell_htlc_updates.len() != 0 || self.holding_cell_update_fee.is_some() {
                        log_trace!(logger, "Freeing holding cell with {} HTLC updates{} in channel {}", self.holding_cell_htlc_updates.len(),
@@ -3319,7 +3329,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
                                }
                        }
                        if update_add_htlcs.is_empty() && update_fulfill_htlcs.is_empty() && update_fail_htlcs.is_empty() && self.holding_cell_update_fee.is_none() {
-                               return Ok((None, htlcs_to_fail));
+                               return (None, htlcs_to_fail);
                        }
                        let update_fee = if let Some(feerate) = self.holding_cell_update_fee.take() {
                                self.send_update_fee(feerate, false, logger)
@@ -3327,8 +3337,8 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
                                None
                        };
 
-                       let (commitment_signed, mut additional_update) = self.send_commitment_no_status_check(logger)?;
-                       // send_commitment_no_status_check and get_update_fulfill_htlc may bump latest_monitor_id
+                       let mut additional_update = self.build_commitment_no_status_check(logger);
+                       // build_commitment_no_status_check and get_update_fulfill_htlc may bump latest_monitor_id
                        // but we want them to be strictly increasing by one, so reset it here.
                        self.latest_monitor_update_id = monitor_update.update_id;
                        monitor_update.updates.append(&mut additional_update.updates);
@@ -3337,16 +3347,11 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
                                log_bytes!(self.channel_id()), if update_fee.is_some() { "a fee update, " } else { "" },
                                update_add_htlcs.len(), update_fulfill_htlcs.len(), update_fail_htlcs.len());
 
-                       Ok((Some((msgs::CommitmentUpdate {
-                               update_add_htlcs,
-                               update_fulfill_htlcs,
-                               update_fail_htlcs,
-                               update_fail_malformed_htlcs: Vec::new(),
-                               update_fee,
-                               commitment_signed,
-                       }, monitor_update)), htlcs_to_fail))
+                       self.monitor_updating_paused(false, true, false, Vec::new(), Vec::new(), Vec::new());
+                       self.pending_monitor_updates.push(monitor_update);
+                       (Some(self.pending_monitor_updates.last().unwrap()), htlcs_to_fail)
                } else {
-                       Ok((None, Vec::new()))
+                       (None, Vec::new())
                }
        }
 
@@ -3355,7 +3360,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
        /// waiting on this revoke_and_ack. The generation of this new commitment_signed may also fail,
        /// generating an appropriate error *after* the channel state has been updated based on the
        /// revoke_and_ack message.
-       pub fn revoke_and_ack<L: Deref>(&mut self, msg: &msgs::RevokeAndACK, logger: &L) -> Result<RAAUpdates, ChannelError>
+       pub fn revoke_and_ack<L: Deref>(&mut self, msg: &msgs::RevokeAndACK, logger: &L) -> Result<(Vec<(HTLCSource, PaymentHash)>, &ChannelMonitorUpdate), ChannelError>
                where L::Target: Logger,
        {
                if (self.channel_state & (ChannelState::ChannelReady as u32)) != (ChannelState::ChannelReady as u32) {
@@ -3542,8 +3547,8 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
                                // When the monitor updating is restored we'll call get_last_commitment_update(),
                                // which does not update state, but we're definitely now awaiting a remote revoke
                                // before we can step forward any more, so set it here.
-                               let (_, mut additional_update) = self.send_commitment_no_status_check(logger)?;
-                               // send_commitment_no_status_check may bump latest_monitor_id but we want them to be
+                               let mut additional_update = self.build_commitment_no_status_check(logger);
+                               // build_commitment_no_status_check may bump latest_monitor_id but we want them to be
                                // strictly increasing by one, so decrement it here.
                                self.latest_monitor_update_id = monitor_update.update_id;
                                monitor_update.updates.append(&mut additional_update.updates);
@@ -3552,71 +3557,41 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
                        self.monitor_pending_failures.append(&mut revoked_htlcs);
                        self.monitor_pending_finalized_fulfills.append(&mut finalized_claimed_htlcs);
                        log_debug!(logger, "Received a valid revoke_and_ack for channel {} but awaiting a monitor update resolution to reply.", log_bytes!(self.channel_id()));
-                       return Ok(RAAUpdates {
-                               commitment_update: None, finalized_claimed_htlcs: Vec::new(),
-                               accepted_htlcs: Vec::new(), failed_htlcs: Vec::new(),
-                               monitor_update,
-                               holding_cell_failed_htlcs: Vec::new()
-                       });
+                       self.pending_monitor_updates.push(monitor_update);
+                       return Ok((Vec::new(), self.pending_monitor_updates.last().unwrap()));
                }
 
-               match self.free_holding_cell_htlcs(logger)? {
-                       (Some((mut commitment_update, mut additional_update)), htlcs_to_fail) => {
-                               commitment_update.update_fail_htlcs.reserve(update_fail_htlcs.len());
-                               for fail_msg in update_fail_htlcs.drain(..) {
-                                       commitment_update.update_fail_htlcs.push(fail_msg);
-                               }
-                               commitment_update.update_fail_malformed_htlcs.reserve(update_fail_malformed_htlcs.len());
-                               for fail_msg in update_fail_malformed_htlcs.drain(..) {
-                                       commitment_update.update_fail_malformed_htlcs.push(fail_msg);
-                               }
-
+               match self.free_holding_cell_htlcs(logger) {
+                       (Some(_), htlcs_to_fail) => {
+                               let mut additional_update = self.pending_monitor_updates.pop().unwrap();
                                // free_holding_cell_htlcs may bump latest_monitor_id multiple times but we want them to be
                                // strictly increasing by one, so decrement it here.
                                self.latest_monitor_update_id = monitor_update.update_id;
                                monitor_update.updates.append(&mut additional_update.updates);
 
-                               Ok(RAAUpdates {
-                                       commitment_update: Some(commitment_update),
-                                       finalized_claimed_htlcs,
-                                       accepted_htlcs: to_forward_infos,
-                                       failed_htlcs: revoked_htlcs,
-                                       monitor_update,
-                                       holding_cell_failed_htlcs: htlcs_to_fail
-                               })
+                               self.monitor_updating_paused(false, true, false, to_forward_infos, revoked_htlcs, finalized_claimed_htlcs);
+                               self.pending_monitor_updates.push(monitor_update);
+                               Ok((htlcs_to_fail, self.pending_monitor_updates.last().unwrap()))
                        },
                        (None, htlcs_to_fail) => {
                                if require_commitment {
-                                       let (commitment_signed, mut additional_update) = self.send_commitment_no_status_check(logger)?;
+                                       let mut additional_update = self.build_commitment_no_status_check(logger);
 
-                                       // send_commitment_no_status_check may bump latest_monitor_id but we want them to be
+                                       // build_commitment_no_status_check may bump latest_monitor_id but we want them to be
                                        // strictly increasing by one, so decrement it here.
                                        self.latest_monitor_update_id = monitor_update.update_id;
                                        monitor_update.updates.append(&mut additional_update.updates);
 
                                        log_debug!(logger, "Received a valid revoke_and_ack for channel {}. Responding with a commitment update with {} HTLCs failed.",
                                                log_bytes!(self.channel_id()), update_fail_htlcs.len() + update_fail_malformed_htlcs.len());
-                                       Ok(RAAUpdates {
-                                               commitment_update: Some(msgs::CommitmentUpdate {
-                                                       update_add_htlcs: Vec::new(),
-                                                       update_fulfill_htlcs: Vec::new(),
-                                                       update_fail_htlcs,
-                                                       update_fail_malformed_htlcs,
-                                                       update_fee: None,
-                                                       commitment_signed
-                                               }),
-                                               finalized_claimed_htlcs,
-                                               accepted_htlcs: to_forward_infos, failed_htlcs: revoked_htlcs,
-                                               monitor_update, holding_cell_failed_htlcs: htlcs_to_fail
-                                       })
+                                       self.monitor_updating_paused(false, true, false, to_forward_infos, revoked_htlcs, finalized_claimed_htlcs);
+                                       self.pending_monitor_updates.push(monitor_update);
+                                       Ok((htlcs_to_fail, self.pending_monitor_updates.last().unwrap()))
                                } else {
                                        log_debug!(logger, "Received a valid revoke_and_ack for channel {} with no reply necessary.", log_bytes!(self.channel_id()));
-                                       Ok(RAAUpdates {
-                                               commitment_update: None,
-                                               finalized_claimed_htlcs,
-                                               accepted_htlcs: to_forward_infos, failed_htlcs: revoked_htlcs,
-                                               monitor_update, holding_cell_failed_htlcs: htlcs_to_fail
-                                       })
+                                       self.monitor_updating_paused(false, false, false, to_forward_infos, revoked_htlcs, finalized_claimed_htlcs);
+                                       self.pending_monitor_updates.push(monitor_update);
+                                       Ok((htlcs_to_fail, self.pending_monitor_updates.last().unwrap()))
                                }
                        }
                }
@@ -3768,15 +3743,17 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
        }
 
        /// Indicates that a ChannelMonitor update is in progress and has not yet been fully persisted.
-       /// This must be called immediately after the [`chain::Watch`] call which returned
-       /// [`ChannelMonitorUpdateStatus::InProgress`].
+       /// This must be called before we return the [`ChannelMonitorUpdate`] back to the
+       /// [`ChannelManager`], which will call [`Self::monitor_updating_restored`] once the monitor
+       /// update completes (potentially immediately).
        /// The messages which were generated with the monitor update must *not* have been sent to the
        /// remote end, and must instead have been dropped. They will be regenerated when
        /// [`Self::monitor_updating_restored`] is called.
        ///
+       /// [`ChannelManager`]: super::channelmanager::ChannelManager
        /// [`chain::Watch`]: crate::chain::Watch
        /// [`ChannelMonitorUpdateStatus::InProgress`]: crate::chain::ChannelMonitorUpdateStatus::InProgress
-       pub fn monitor_updating_paused(&mut self, resend_raa: bool, resend_commitment: bool,
+       fn monitor_updating_paused(&mut self, resend_raa: bool, resend_commitment: bool,
                resend_channel_ready: bool, mut pending_forwards: Vec<(PendingHTLCInfo, u64)>,
                mut pending_fails: Vec<(HTLCSource, PaymentHash, HTLCFailReason)>,
                mut pending_finalized_claimed_htlcs: Vec<HTLCSource>
@@ -3803,6 +3780,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
        {
                assert_eq!(self.channel_state & ChannelState::MonitorUpdateInProgress as u32, ChannelState::MonitorUpdateInProgress as u32);
                self.channel_state &= !(ChannelState::MonitorUpdateInProgress as u32);
+               self.pending_monitor_updates.clear();
 
                // If we're past (or at) the FundingSent stage on an outbound channel, try to
                // (re-)broadcast the funding transaction as we may have declined to broadcast it when we
@@ -4280,7 +4258,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
 
        pub fn shutdown<SP: Deref>(
                &mut self, signer_provider: &SP, their_features: &InitFeatures, msg: &msgs::Shutdown
-       ) -> Result<(Option<msgs::Shutdown>, Option<ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>), ChannelError>
+       ) -> Result<(Option<msgs::Shutdown>, Option<&ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>), ChannelError>
        where SP::Target: SignerProvider
        {
                if self.channel_state & (ChannelState::PeerDisconnected as u32) == ChannelState::PeerDisconnected as u32 {
@@ -4336,12 +4314,15 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
 
                let monitor_update = if update_shutdown_script {
                        self.latest_monitor_update_id += 1;
-                       Some(ChannelMonitorUpdate {
+                       let monitor_update = ChannelMonitorUpdate {
                                update_id: self.latest_monitor_update_id,
                                updates: vec![ChannelMonitorUpdateStep::ShutdownScript {
                                        scriptpubkey: self.get_closing_scriptpubkey(),
                                }],
-                       })
+                       };
+                       self.monitor_updating_paused(false, false, false, Vec::new(), Vec::new(), Vec::new());
+                       self.pending_monitor_updates.push(monitor_update);
+                       Some(self.pending_monitor_updates.last().unwrap())
                } else { None };
                let shutdown = if send_shutdown {
                        Some(msgs::Shutdown {
@@ -4891,6 +4872,10 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
                (self.channel_state & ChannelState::MonitorUpdateInProgress as u32) != 0
        }
 
+       pub fn get_next_monitor_update(&self) -> Option<&ChannelMonitorUpdate> {
+               self.pending_monitor_updates.first()
+       }
+
        /// Returns true if funding_created was sent/received.
        pub fn is_funding_initiated(&self) -> bool {
                self.channel_state >= ChannelState::FundingSent as u32
@@ -5791,8 +5776,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
                Ok(Some(res))
        }
 
-       /// Only fails in case of bad keys
-       fn send_commitment_no_status_check<L: Deref>(&mut self, logger: &L) -> Result<(msgs::CommitmentSigned, ChannelMonitorUpdate), ChannelError> where L::Target: Logger {
+       fn build_commitment_no_status_check<L: Deref>(&mut self, logger: &L) -> ChannelMonitorUpdate where L::Target: Logger {
                log_trace!(logger, "Updating HTLC state for a newly-sent commitment_signed...");
                // We can upgrade the status of some HTLCs that are waiting on a commitment, even if we
                // fail to generate this, we still are at least at a position where upgrading their status
@@ -5825,15 +5809,9 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
                }
                self.resend_order = RAACommitmentOrder::RevokeAndACKFirst;
 
-               let (res, counterparty_commitment_txid, htlcs) = match self.send_commitment_no_state_update(logger) {
-                       Ok((res, (counterparty_commitment_tx, mut htlcs))) => {
-                               // Update state now that we've passed all the can-fail calls...
-                               let htlcs_no_ref: Vec<(HTLCOutputInCommitment, Option<Box<HTLCSource>>)> =
-                                       htlcs.drain(..).map(|(htlc, htlc_source)| (htlc, htlc_source.map(|source_ref| Box::new(source_ref.clone())))).collect();
-                               (res, counterparty_commitment_tx, htlcs_no_ref)
-                       },
-                       Err(e) => return Err(e),
-               };
+               let (counterparty_commitment_txid, mut htlcs_ref) = self.build_commitment_no_state_update(logger);
+               let htlcs: Vec<(HTLCOutputInCommitment, Option<Box<HTLCSource>>)> =
+                       htlcs_ref.drain(..).map(|(htlc, htlc_source)| (htlc, htlc_source.map(|source_ref| Box::new(source_ref.clone())))).collect();
 
                if self.announcement_sigs_state == AnnouncementSigsState::MessageSent {
                        self.announcement_sigs_state = AnnouncementSigsState::Committed;
@@ -5850,16 +5828,13 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
                        }]
                };
                self.channel_state |= ChannelState::AwaitingRemoteRevoke as u32;
-               Ok((res, monitor_update))
+               monitor_update
        }
 
-       /// Only fails in case of bad keys. Used for channel_reestablish commitment_signed generation
-       /// when we shouldn't change HTLC/channel state.
-       fn send_commitment_no_state_update<L: Deref>(&self, logger: &L) -> Result<(msgs::CommitmentSigned, (Txid, Vec<(HTLCOutputInCommitment, Option<&HTLCSource>)>)), ChannelError> where L::Target: Logger {
+       fn build_commitment_no_state_update<L: Deref>(&self, logger: &L) -> (Txid, Vec<(HTLCOutputInCommitment, Option<&HTLCSource>)>) where L::Target: Logger {
                let counterparty_keys = self.build_remote_transaction_keys();
                let commitment_stats = self.build_commitment_transaction(self.cur_counterparty_commitment_transaction_number, &counterparty_keys, false, true, logger);
                let counterparty_commitment_txid = commitment_stats.tx.trust().txid();
-               let (signature, htlc_signatures);
 
                #[cfg(any(test, fuzzing))]
                {
@@ -5879,6 +5854,21 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
                        }
                }
 
+               (counterparty_commitment_txid, commitment_stats.htlcs_included)
+       }
+
+       /// Only fails in case of signer rejection. Used for channel_reestablish commitment_signed
+       /// generation when we shouldn't change HTLC/channel state.
+       fn send_commitment_no_state_update<L: Deref>(&self, logger: &L) -> Result<(msgs::CommitmentSigned, (Txid, Vec<(HTLCOutputInCommitment, Option<&HTLCSource>)>)), ChannelError> where L::Target: Logger {
+               // Get the fee tests from `build_commitment_no_state_update`
+               #[cfg(any(test, fuzzing))]
+               self.build_commitment_no_state_update(logger);
+
+               let counterparty_keys = self.build_remote_transaction_keys();
+               let commitment_stats = self.build_commitment_transaction(self.cur_counterparty_commitment_transaction_number, &counterparty_keys, false, true, logger);
+               let counterparty_commitment_txid = commitment_stats.tx.trust().txid();
+               let (signature, htlc_signatures);
+
                {
                        let mut htlcs = Vec::with_capacity(commitment_stats.htlcs_included.len());
                        for &(ref htlc, _) in commitment_stats.htlcs_included.iter() {
@@ -5911,16 +5901,20 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
                }, (counterparty_commitment_txid, commitment_stats.htlcs_included)))
        }
 
-       /// Adds a pending outbound HTLC to this channel, and creates a signed commitment transaction
-       /// to send to the remote peer in one go.
+       /// Adds a pending outbound HTLC to this channel, and builds a new remote commitment
+       /// transaction and generates the corresponding [`ChannelMonitorUpdate`] in one go.
        ///
        /// Shorthand for calling [`Self::send_htlc`] followed by a commitment update, see docs on
-       /// [`Self::send_htlc`] and [`Self::send_commitment_no_state_update`] for more info.
-       pub fn send_htlc_and_commit<L: Deref>(&mut self, amount_msat: u64, payment_hash: PaymentHash, cltv_expiry: u32, source: HTLCSource, onion_routing_packet: msgs::OnionPacket, logger: &L) -> Result<Option<(msgs::UpdateAddHTLC, msgs::CommitmentSigned, ChannelMonitorUpdate)>, ChannelError> where L::Target: Logger {
-               match self.send_htlc(amount_msat, payment_hash, cltv_expiry, source, onion_routing_packet, false, logger)? {
-                       Some(update_add_htlc) => {
-                               let (commitment_signed, monitor_update) = self.send_commitment_no_status_check(logger)?;
-                               Ok(Some((update_add_htlc, commitment_signed, monitor_update)))
+       /// [`Self::send_htlc`] and [`Self::build_commitment_no_state_update`] for more info.
+       pub fn send_htlc_and_commit<L: Deref>(&mut self, amount_msat: u64, payment_hash: PaymentHash, cltv_expiry: u32, source: HTLCSource, onion_routing_packet: msgs::OnionPacket, logger: &L) -> Result<Option<&ChannelMonitorUpdate>, ChannelError> where L::Target: Logger {
+               let send_res = self.send_htlc(amount_msat, payment_hash, cltv_expiry, source, onion_routing_packet, false, logger);
+               if let Err(e) = &send_res { if let ChannelError::Ignore(_) = e {} else { debug_assert!(false, "Sending cannot trigger channel failure"); } }
+               match send_res? {
+                       Some(_) => {
+                               let monitor_update = self.build_commitment_no_status_check(logger);
+                               self.monitor_updating_paused(false, true, false, Vec::new(), Vec::new(), Vec::new());
+                               self.pending_monitor_updates.push(monitor_update);
+                               Ok(Some(self.pending_monitor_updates.last().unwrap()))
                        },
                        None => Ok(None)
                }
@@ -5946,8 +5940,12 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
 
        /// Begins the shutdown process, getting a message for the remote peer and returning all
        /// holding cell HTLCs for payment failure.
-       pub fn get_shutdown<SP: Deref>(&mut self, signer_provider: &SP, their_features: &InitFeatures, target_feerate_sats_per_kw: Option<u32>)
-       -> Result<(msgs::Shutdown, Option<ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>), APIError>
+       ///
+       /// May jump to the channel being fully shutdown (see [`Self::is_shutdown`]) in which case no
+       /// [`ChannelMonitorUpdate`] will be returned).
+       pub fn get_shutdown<SP: Deref>(&mut self, signer_provider: &SP, their_features: &InitFeatures,
+               target_feerate_sats_per_kw: Option<u32>)
+       -> Result<(msgs::Shutdown, Option<&ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>), APIError>
        where SP::Target: SignerProvider {
                for htlc in self.pending_outbound_htlcs.iter() {
                        if let OutboundHTLCState::LocalAnnounced(_) = htlc.state {
@@ -5967,9 +5965,16 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
                        return Err(APIError::ChannelUnavailable{err: "Cannot begin shutdown while peer is disconnected or we're waiting on a monitor update, maybe force-close instead?".to_owned()});
                }
 
+               // If we haven't funded the channel yet, we don't need to bother ensuring the shutdown
+               // script is set, we just force-close and call it a day.
+               let mut chan_closed = false;
+               if self.channel_state < ChannelState::FundingSent as u32 {
+                       chan_closed = true;
+               }
+
                let update_shutdown_script = match self.shutdown_scriptpubkey {
                        Some(_) => false,
-                       None => {
+                       None if !chan_closed => {
                                let shutdown_scriptpubkey = signer_provider.get_shutdown_scriptpubkey();
                                if !shutdown_scriptpubkey.is_compatible(their_features) {
                                        return Err(APIError::IncompatibleShutdownScript { script: shutdown_scriptpubkey.clone() });
@@ -5977,6 +5982,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
                                self.shutdown_scriptpubkey = Some(shutdown_scriptpubkey);
                                true
                        },
+                       None => false,
                };
 
                // From here on out, we may not fail!
@@ -5990,12 +5996,15 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
 
                let monitor_update = if update_shutdown_script {
                        self.latest_monitor_update_id += 1;
-                       Some(ChannelMonitorUpdate {
+                       let monitor_update = ChannelMonitorUpdate {
                                update_id: self.latest_monitor_update_id,
                                updates: vec![ChannelMonitorUpdateStep::ShutdownScript {
                                        scriptpubkey: self.get_closing_scriptpubkey(),
                                }],
-                       })
+                       };
+                       self.monitor_updating_paused(false, false, false, Vec::new(), Vec::new(), Vec::new());
+                       self.pending_monitor_updates.push(monitor_update);
+                       Some(self.pending_monitor_updates.last().unwrap())
                } else { None };
                let shutdown = msgs::Shutdown {
                        channel_id: self.channel_id,
@@ -6016,6 +6025,9 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
                        }
                });
 
+               debug_assert!(!self.is_shutdown() || monitor_update.is_none(),
+                       "we can't both complete shutdown and return a monitor update");
+
                Ok((shutdown, monitor_update, dropped_outbound_htlcs))
        }
 
@@ -6877,6 +6889,8 @@ impl<'a, 'b, 'c, ES: Deref, SP: Deref> ReadableArgs<(&'a ES, &'b SP, u32, &'c Ch
 
                        channel_type: channel_type.unwrap(),
                        channel_keys_id,
+
+                       pending_monitor_updates: Vec::new(),
                })
        }
 }
@@ -7188,7 +7202,7 @@ mod tests {
                }]};
                let funding_outpoint = OutPoint{ txid: tx.txid(), index: 0 };
                let funding_created_msg = node_a_chan.get_outbound_funding_created(tx.clone(), funding_outpoint, &&logger).unwrap();
-               let (funding_signed_msg, _, _) = node_b_chan.funding_created(&funding_created_msg, best_block, &&keys_provider, &&logger).unwrap();
+               let (funding_signed_msg, _) = node_b_chan.funding_created(&funding_created_msg, best_block, &&keys_provider, &&logger).unwrap();
 
                // Node B --> Node A: funding signed
                let _ = node_a_chan.funding_signed(&funding_signed_msg, best_block, &&keys_provider, &&logger);
index 45f45e138629d7dc6ddf2ed27b04d45e55e174ed..61de296c1442f3096fcb8b88efbd341157c4de6d 100644 (file)
@@ -65,6 +65,8 @@ use crate::util::ser::{BigSize, FixedLengthReader, Readable, ReadableArgs, Maybe
 use crate::util::logger::{Level, Logger};
 use crate::util::errors::APIError;
 
+use alloc::collections::BTreeMap;
+
 use crate::io;
 use crate::prelude::*;
 use core::{cmp, mem};
@@ -343,17 +345,6 @@ impl MsgHandleErrInternal {
                }
        }
        #[inline]
-       fn ignore_no_close(err: String) -> Self {
-               Self {
-                       err: LightningError {
-                               err,
-                               action: msgs::ErrorAction::IgnoreError,
-                       },
-                       chan_id: None,
-                       shutdown_finish: None,
-               }
-       }
-       #[inline]
        fn from_no_close(err: msgs::LightningError) -> Self {
                Self { err, chan_id: None, shutdown_finish: None }
        }
@@ -464,6 +455,7 @@ enum BackgroundEvent {
        ClosingMonitorUpdate((OutPoint, ChannelMonitorUpdate)),
 }
 
+#[derive(Debug)]
 pub(crate) enum MonitorUpdateCompletionAction {
        /// Indicates that a payment ultimately destined for us was claimed and we should emit an
        /// [`events::Event::PaymentClaimed`] to the user if we haven't yet generated such an event for
@@ -474,6 +466,11 @@ pub(crate) enum MonitorUpdateCompletionAction {
        EmitEvent { event: events::Event },
 }
 
+impl_writeable_tlv_based_enum_upgradable!(MonitorUpdateCompletionAction,
+       (0, PaymentClaimed) => { (0, payment_hash, required) },
+       (2, EmitEvent) => { (0, event, ignorable) },
+);
+
 /// State we hold per-peer.
 pub(super) struct PeerState<Signer: ChannelSigner> {
        /// `temporary_channel_id` or `channel_id` -> `channel`.
@@ -487,6 +484,21 @@ pub(super) struct PeerState<Signer: ChannelSigner> {
        /// Messages to send to the peer - pushed to in the same lock that they are generated in (except
        /// for broadcast messages, where ordering isn't as strict).
        pub(super) pending_msg_events: Vec<MessageSendEvent>,
+       /// Map from a specific channel to some action(s) that should be taken when all pending
+       /// [`ChannelMonitorUpdate`]s for the channel complete updating.
+       ///
+       /// Note that because we generally only have one entry here a HashMap is pretty overkill. A
+       /// BTreeMap currently stores more than ten elements per leaf node, so even up to a few
+       /// channels with a peer this will just be one allocation and will amount to a linear list of
+       /// channels to walk, avoiding the whole hashing rigmarole.
+       ///
+       /// Note that the channel may no longer exist. For example, if a channel was closed but we
+       /// later needed to claim an HTLC which is pending on-chain, we may generate a monitor update
+       /// for a missing channel. While a malicious peer could construct a second channel with the
+       /// same `temporary_channel_id` (or final `channel_id` in the case of 0conf channels or prior
+       /// to funding appearing on-chain), the downstream `ChannelMonitor` set is required to ensure
+       /// duplicates do not occur, so such channels should fail without a monitor update completing.
+       monitor_update_blocked_actions: BTreeMap<[u8; 32], Vec<MonitorUpdateCompletionAction>>,
        /// The peer is currently connected (i.e. we've seen a
        /// [`ChannelMessageHandler::peer_connected`] and no corresponding
        /// [`ChannelMessageHandler::peer_disconnected`].
@@ -501,7 +513,7 @@ impl <Signer: ChannelSigner> PeerState<Signer> {
                if require_disconnected && self.is_connected {
                        return false
                }
-               self.channel_by_id.len() == 0
+               self.channel_by_id.is_empty() && self.monitor_update_blocked_actions.is_empty()
        }
 }
 
@@ -1367,78 +1379,6 @@ macro_rules! remove_channel {
        }
 }
 
-macro_rules! handle_monitor_update_res {
-       ($self: ident, $err: expr, $chan: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr, $resend_channel_ready: expr, $failed_forwards: expr, $failed_fails: expr, $failed_finalized_fulfills: expr, $chan_id: expr) => {
-               match $err {
-                       ChannelMonitorUpdateStatus::PermanentFailure => {
-                               log_error!($self.logger, "Closing channel {} due to monitor update ChannelMonitorUpdateStatus::PermanentFailure", log_bytes!($chan_id[..]));
-                               update_maps_on_chan_removal!($self, $chan);
-                               // TODO: $failed_fails is dropped here, which will cause other channels to hit the
-                               // chain in a confused state! We need to move them into the ChannelMonitor which
-                               // will be responsible for failing backwards once things confirm on-chain.
-                               // It's ok that we drop $failed_forwards here - at this point we'd rather they
-                               // broadcast HTLC-Timeout and pay the associated fees to get their funds back than
-                               // us bother trying to claim it just to forward on to another peer. If we're
-                               // splitting hairs we'd prefer to claim payments that were to us, but we haven't
-                               // given up the preimage yet, so might as well just wait until the payment is
-                               // retried, avoiding the on-chain fees.
-                               let res: Result<(), _> = Err(MsgHandleErrInternal::from_finish_shutdown("ChannelMonitor storage failure".to_owned(), *$chan_id, $chan.get_user_id(),
-                                               $chan.force_shutdown(false), $self.get_channel_update_for_broadcast(&$chan).ok() ));
-                               (res, true)
-                       },
-                       ChannelMonitorUpdateStatus::InProgress => {
-                               log_info!($self.logger, "Disabling channel {} due to monitor update in progress. On restore will send {} and process {} forwards, {} fails, and {} fulfill finalizations",
-                                               log_bytes!($chan_id[..]),
-                                               if $resend_commitment && $resend_raa {
-                                                               match $action_type {
-                                                                       RAACommitmentOrder::CommitmentFirst => { "commitment then RAA" },
-                                                                       RAACommitmentOrder::RevokeAndACKFirst => { "RAA then commitment" },
-                                                               }
-                                                       } else if $resend_commitment { "commitment" }
-                                                       else if $resend_raa { "RAA" }
-                                                       else { "nothing" },
-                                               (&$failed_forwards as &Vec<(PendingHTLCInfo, u64)>).len(),
-                                               (&$failed_fails as &Vec<(HTLCSource, PaymentHash, HTLCFailReason)>).len(),
-                                               (&$failed_finalized_fulfills as &Vec<HTLCSource>).len());
-                               if !$resend_commitment {
-                                       debug_assert!($action_type == RAACommitmentOrder::RevokeAndACKFirst || !$resend_raa);
-                               }
-                               if !$resend_raa {
-                                       debug_assert!($action_type == RAACommitmentOrder::CommitmentFirst || !$resend_commitment);
-                               }
-                               $chan.monitor_updating_paused($resend_raa, $resend_commitment, $resend_channel_ready, $failed_forwards, $failed_fails, $failed_finalized_fulfills);
-                               (Err(MsgHandleErrInternal::from_chan_no_close(ChannelError::Ignore("Failed to update ChannelMonitor".to_owned()), *$chan_id)), false)
-                       },
-                       ChannelMonitorUpdateStatus::Completed => {
-                               (Ok(()), false)
-                       },
-               }
-       };
-       ($self: ident, $err: expr, $entry: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr, $resend_channel_ready: expr, $failed_forwards: expr, $failed_fails: expr, $failed_finalized_fulfills: expr) => { {
-               let (res, drop) = handle_monitor_update_res!($self, $err, $entry.get_mut(), $action_type, $resend_raa, $resend_commitment, $resend_channel_ready, $failed_forwards, $failed_fails, $failed_finalized_fulfills, $entry.key());
-               if drop {
-                       $entry.remove_entry();
-               }
-               res
-       } };
-       ($self: ident, $err: expr, $entry: expr, $action_type: path, $chan_id: expr, COMMITMENT_UPDATE_ONLY) => { {
-               debug_assert!($action_type == RAACommitmentOrder::CommitmentFirst);
-               handle_monitor_update_res!($self, $err, $entry, $action_type, false, true, false, Vec::new(), Vec::new(), Vec::new(), $chan_id)
-       } };
-       ($self: ident, $err: expr, $entry: expr, $action_type: path, $chan_id: expr, NO_UPDATE) => {
-               handle_monitor_update_res!($self, $err, $entry, $action_type, false, false, false, Vec::new(), Vec::new(), Vec::new(), $chan_id)
-       };
-       ($self: ident, $err: expr, $entry: expr, $action_type: path, $resend_channel_ready: expr, OPTIONALLY_RESEND_FUNDING_LOCKED) => {
-               handle_monitor_update_res!($self, $err, $entry, $action_type, false, false, $resend_channel_ready, Vec::new(), Vec::new(), Vec::new())
-       };
-       ($self: ident, $err: expr, $entry: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr) => {
-               handle_monitor_update_res!($self, $err, $entry, $action_type, $resend_raa, $resend_commitment, false, Vec::new(), Vec::new(), Vec::new())
-       };
-       ($self: ident, $err: expr, $entry: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr, $failed_forwards: expr, $failed_fails: expr) => {
-               handle_monitor_update_res!($self, $err, $entry, $action_type, $resend_raa, $resend_commitment, false, $failed_forwards, $failed_fails, Vec::new())
-       };
-}
-
 macro_rules! send_channel_ready {
        ($self: ident, $pending_msg_events: expr, $channel: expr, $channel_ready_msg: expr) => {{
                $pending_msg_events.push(events::MessageSendEvent::SendChannelReady {
@@ -1476,6 +1416,93 @@ macro_rules! emit_channel_ready_event {
        }
 }
 
+macro_rules! handle_monitor_update_completion {
+       ($self: ident, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $chan: expr) => { {
+               let mut updates = $chan.monitor_updating_restored(&$self.logger,
+                       &$self.node_signer, $self.genesis_hash, &$self.default_configuration,
+                       $self.best_block.read().unwrap().height());
+               let counterparty_node_id = $chan.get_counterparty_node_id();
+               let channel_update = if updates.channel_ready.is_some() && $chan.is_usable() {
+                       // We only send a channel_update in the case where we are just now sending a
+                       // channel_ready and the channel is in a usable state. We may re-send a
+                       // channel_update later through the announcement_signatures process for public
+                       // channels, but there's no reason not to just inform our counterparty of our fees
+                       // now.
+                       if let Ok(msg) = $self.get_channel_update_for_unicast($chan) {
+                               Some(events::MessageSendEvent::SendChannelUpdate {
+                                       node_id: counterparty_node_id,
+                                       msg,
+                               })
+                       } else { None }
+               } else { None };
+
+               let update_actions = $peer_state.monitor_update_blocked_actions
+                       .remove(&$chan.channel_id()).unwrap_or(Vec::new());
+
+               let htlc_forwards = $self.handle_channel_resumption(
+                       &mut $peer_state.pending_msg_events, $chan, updates.raa,
+                       updates.commitment_update, updates.order, updates.accepted_htlcs,
+                       updates.funding_broadcastable, updates.channel_ready,
+                       updates.announcement_sigs);
+               if let Some(upd) = channel_update {
+                       $peer_state.pending_msg_events.push(upd);
+               }
+
+               let channel_id = $chan.channel_id();
+               core::mem::drop($peer_state_lock);
+
+               $self.handle_monitor_update_completion_actions(update_actions);
+
+               if let Some(forwards) = htlc_forwards {
+                       $self.forward_htlcs(&mut [forwards][..]);
+               }
+               $self.finalize_claims(updates.finalized_claimed_htlcs);
+               for failure in updates.failed_htlcs.drain(..) {
+                       let receiver = HTLCDestination::NextHopChannel { node_id: Some(counterparty_node_id), channel_id };
+                       $self.fail_htlc_backwards_internal(&failure.0, &failure.1, &failure.2, receiver);
+               }
+       } }
+}
+
+macro_rules! handle_new_monitor_update {
+       ($self: ident, $update_res: expr, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $chan: expr, MANUALLY_REMOVING, $remove: expr) => { {
+               // update_maps_on_chan_removal needs to be able to take id_to_peer, so make sure we can in
+               // any case so that it won't deadlock.
+               debug_assert!($self.id_to_peer.try_lock().is_ok());
+               match $update_res {
+                       ChannelMonitorUpdateStatus::InProgress => {
+                               log_debug!($self.logger, "ChannelMonitor update for {} in flight, holding messages until the update completes.",
+                                       log_bytes!($chan.channel_id()[..]));
+                               Ok(())
+                       },
+                       ChannelMonitorUpdateStatus::PermanentFailure => {
+                               log_error!($self.logger, "Closing channel {} due to monitor update ChannelMonitorUpdateStatus::PermanentFailure",
+                                       log_bytes!($chan.channel_id()[..]));
+                               update_maps_on_chan_removal!($self, $chan);
+                               let res: Result<(), _> = Err(MsgHandleErrInternal::from_finish_shutdown(
+                                       "ChannelMonitor storage failure".to_owned(), $chan.channel_id(),
+                                       $chan.get_user_id(), $chan.force_shutdown(false),
+                                       $self.get_channel_update_for_broadcast(&$chan).ok()));
+                               $remove;
+                               res
+                       },
+                       ChannelMonitorUpdateStatus::Completed => {
+                               if ($update_id == 0 || $chan.get_next_monitor_update()
+                                       .expect("We can't be processing a monitor update if it isn't queued")
+                                       .update_id == $update_id) &&
+                                       $chan.get_latest_monitor_update_id() == $update_id
+                               {
+                                       handle_monitor_update_completion!($self, $update_id, $peer_state_lock, $peer_state, $chan);
+                               }
+                               Ok(())
+                       },
+               }
+       } };
+       ($self: ident, $update_res: expr, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $chan_entry: expr) => {
+               handle_new_monitor_update!($self, $update_res, $update_id, $peer_state_lock, $peer_state, $chan_entry.get_mut(), MANUALLY_REMOVING, $chan_entry.remove_entry())
+       }
+}
+
 impl<M: Deref, T: Deref, ES: Deref, NS: Deref, SP: Deref, F: Deref, R: Deref, L: Deref> ChannelManager<M, T, ES, NS, SP, F, R, L>
 where
        M::Target: chain::Watch<<SP::Target as SignerProvider>::Signer>,
@@ -1790,25 +1817,27 @@ where
                        let peer_state = &mut *peer_state_lock;
                        match peer_state.channel_by_id.entry(channel_id.clone()) {
                                hash_map::Entry::Occupied(mut chan_entry) => {
-                                       let (shutdown_msg, monitor_update, htlcs) = chan_entry.get_mut().get_shutdown(&self.signer_provider, &peer_state.latest_features, target_feerate_sats_per_1000_weight)?;
+                                       let funding_txo_opt = chan_entry.get().get_funding_txo();
+                                       let their_features = &peer_state.latest_features;
+                                       let (shutdown_msg, mut monitor_update_opt, htlcs) = chan_entry.get_mut()
+                                               .get_shutdown(&self.signer_provider, their_features, target_feerate_sats_per_1000_weight)?;
                                        failed_htlcs = htlcs;
 
-                                       // Update the monitor with the shutdown script if necessary.
-                                       if let Some(monitor_update) = monitor_update {
-                                               let update_res = self.chain_monitor.update_channel(chan_entry.get().get_funding_txo().unwrap(), &monitor_update);
-                                               let (result, is_permanent) =
-                                                       handle_monitor_update_res!(self, update_res, chan_entry.get_mut(), RAACommitmentOrder::CommitmentFirst, chan_entry.key(), NO_UPDATE);
-                                               if is_permanent {
-                                                       remove_channel!(self, chan_entry);
-                                                       break result;
-                                               }
-                                       }
-
+                                       // We can send the `shutdown` message before updating the `ChannelMonitor`
+                                       // here as we don't need the monitor update to complete until we send a
+                                       // `shutdown_signed`, which we'll delay if we're pending a monitor update.
                                        peer_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown {
                                                node_id: *counterparty_node_id,
-                                               msg: shutdown_msg
+                                               msg: shutdown_msg,
                                        });
 
+                                       // Update the monitor with the shutdown script if necessary.
+                                       if let Some(monitor_update) = monitor_update_opt.take() {
+                                               let update_id = monitor_update.update_id;
+                                               let update_res = self.chain_monitor.update_channel(funding_txo_opt.unwrap(), monitor_update);
+                                               break handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, peer_state, chan_entry);
+                                       }
+
                                        if chan_entry.get().is_shutdown() {
                                                let channel = remove_channel!(self, chan_entry);
                                                if let Ok(channel_update) = self.get_channel_update_for_broadcast(&channel) {
@@ -2412,54 +2441,35 @@ where
                        let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                        let peer_state = &mut *peer_state_lock;
                        if let hash_map::Entry::Occupied(mut chan) = peer_state.channel_by_id.entry(id) {
-                               match {
-                                       if !chan.get().is_live() {
-                                               return Err(APIError::ChannelUnavailable{err: "Peer for first hop currently disconnected/pending monitor update!".to_owned()});
-                                       }
-                                       break_chan_entry!(self, chan.get_mut().send_htlc_and_commit(
-                                               htlc_msat, payment_hash.clone(), htlc_cltv, HTLCSource::OutboundRoute {
-                                                       path: path.clone(),
-                                                       session_priv: session_priv.clone(),
-                                                       first_hop_htlc_msat: htlc_msat,
-                                                       payment_id,
-                                                       payment_secret: payment_secret.clone(),
-                                                       payment_params: payment_params.clone(),
-                                               }, onion_packet, &self.logger),
-                                               chan)
-                               } {
-                                       Some((update_add, commitment_signed, monitor_update)) => {
-                                               let update_err = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &monitor_update);
-                                               let chan_id = chan.get().channel_id();
-                                               match (update_err,
-                                                       handle_monitor_update_res!(self, update_err, chan,
-                                                               RAACommitmentOrder::CommitmentFirst, false, true))
-                                               {
-                                                       (ChannelMonitorUpdateStatus::PermanentFailure, Err(e)) => break Err(e),
-                                                       (ChannelMonitorUpdateStatus::Completed, Ok(())) => {},
-                                                       (ChannelMonitorUpdateStatus::InProgress, Err(_)) => {
-                                                               // Note that MonitorUpdateInProgress here indicates (per function
-                                                               // docs) that we will resend the commitment update once monitor
-                                                               // updating completes. Therefore, we must return an error
-                                                               // indicating that it is unsafe to retry the payment wholesale,
-                                                               // which we do in the send_payment check for
-                                                               // MonitorUpdateInProgress, below.
-                                                               return Err(APIError::MonitorUpdateInProgress);
-                                                       },
-                                                       _ => unreachable!(),
+                               if !chan.get().is_live() {
+                                       return Err(APIError::ChannelUnavailable{err: "Peer for first hop currently disconnected".to_owned()});
+                               }
+                               let funding_txo = chan.get().get_funding_txo().unwrap();
+                               let send_res = chan.get_mut().send_htlc_and_commit(htlc_msat, payment_hash.clone(),
+                                       htlc_cltv, HTLCSource::OutboundRoute {
+                                               path: path.clone(),
+                                               session_priv: session_priv.clone(),
+                                               first_hop_htlc_msat: htlc_msat,
+                                               payment_id,
+                                               payment_secret: payment_secret.clone(),
+                                               payment_params: payment_params.clone(),
+                                       }, onion_packet, &self.logger);
+                               match break_chan_entry!(self, send_res, chan) {
+                                       Some(monitor_update) => {
+                                               let update_id = monitor_update.update_id;
+                                               let update_res = self.chain_monitor.update_channel(funding_txo, monitor_update);
+                                               if let Err(e) = handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, peer_state, chan) {
+                                                       break Err(e);
+                                               }
+                                               if update_res == ChannelMonitorUpdateStatus::InProgress {
+                                                       // Note that MonitorUpdateInProgress here indicates (per function
+                                                       // docs) that we will resend the commitment update once monitor
+                                                       // updating completes. Therefore, we must return an error
+                                                       // indicating that it is unsafe to retry the payment wholesale,
+                                                       // which we do in the send_payment check for
+                                                       // MonitorUpdateInProgress, below.
+                                                       return Err(APIError::MonitorUpdateInProgress);
                                                }
-
-                                               log_debug!(self.logger, "Sending payment along path resulted in a commitment_signed for channel {}", log_bytes!(chan_id));
-                                               peer_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
-                                                       node_id: path.first().unwrap().pubkey,
-                                                       updates: msgs::CommitmentUpdate {
-                                                               update_add_htlcs: vec![update_add],
-                                                               update_fulfill_htlcs: Vec::new(),
-                                                               update_fail_htlcs: Vec::new(),
-                                                               update_fail_malformed_htlcs: Vec::new(),
-                                                               update_fee: None,
-                                                               commitment_signed,
-                                                       },
-                                               });
                                        },
                                        None => { },
                                }
@@ -3957,7 +3967,6 @@ where
 
                let per_peer_state = self.per_peer_state.read().unwrap();
                let chan_id = prev_hop.outpoint.to_channel_id();
-
                let counterparty_node_id_opt = match self.short_to_chan_info.read().unwrap().get(&prev_hop.short_channel_id) {
                        Some((cp_id, _dup_chan_id)) => Some(cp_id.clone()),
                        None => None
@@ -3969,99 +3978,57 @@ where
                        )
                ).unwrap_or(None);
 
-               if let Some(hash_map::Entry::Occupied(mut chan)) = peer_state_opt.as_mut().map(|peer_state| peer_state.channel_by_id.entry(chan_id))
-               {
-                       let counterparty_node_id = chan.get().get_counterparty_node_id();
-                       match chan.get_mut().get_update_fulfill_htlc_and_commit(prev_hop.htlc_id, payment_preimage, &self.logger) {
-                               Ok(msgs_monitor_option) => {
-                                       if let UpdateFulfillCommitFetch::NewClaim { msgs, htlc_value_msat, monitor_update } = msgs_monitor_option {
-                                               match self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &monitor_update) {
-                                                       ChannelMonitorUpdateStatus::Completed => {},
-                                                       e => {
-                                                               log_given_level!(self.logger, if e == ChannelMonitorUpdateStatus::PermanentFailure { Level::Error } else { Level::Debug },
-                                                                       "Failed to update channel monitor with preimage {:?}: {:?}",
-                                                                       payment_preimage, e);
-                                                               let err = handle_monitor_update_res!(self, e, chan, RAACommitmentOrder::CommitmentFirst, false, msgs.is_some()).unwrap_err();
-                                                               mem::drop(peer_state_opt);
-                                                               mem::drop(per_peer_state);
-                                                               self.handle_monitor_update_completion_actions(completion_action(Some(htlc_value_msat)));
-                                                               return Err((counterparty_node_id, err));
-                                                       }
-                                               }
-                                               if let Some((msg, commitment_signed)) = msgs {
-                                                       log_debug!(self.logger, "Claiming funds for HTLC with preimage {} resulted in a commitment_signed for channel {}",
-                                                               log_bytes!(payment_preimage.0), log_bytes!(chan.get().channel_id()));
-                                                       peer_state_opt.as_mut().unwrap().pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
-                                                               node_id: counterparty_node_id,
-                                                               updates: msgs::CommitmentUpdate {
-                                                                       update_add_htlcs: Vec::new(),
-                                                                       update_fulfill_htlcs: vec![msg],
-                                                                       update_fail_htlcs: Vec::new(),
-                                                                       update_fail_malformed_htlcs: Vec::new(),
-                                                                       update_fee: None,
-                                                                       commitment_signed,
-                                                               }
-                                                       });
-                                               }
-                                               mem::drop(peer_state_opt);
-                                               mem::drop(per_peer_state);
-                                               self.handle_monitor_update_completion_actions(completion_action(Some(htlc_value_msat)));
-                                               Ok(())
-                                       } else {
-                                               Ok(())
-                                       }
-                               },
-                               Err((e, monitor_update)) => {
-                                       match self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &monitor_update) {
-                                               ChannelMonitorUpdateStatus::Completed => {},
-                                               e => {
-                                                       // TODO: This needs to be handled somehow - if we receive a monitor update
-                                                       // with a preimage we *must* somehow manage to propagate it to the upstream
-                                                       // channel, or we must have an ability to receive the same update and try
-                                                       // again on restart.
-                                                       log_given_level!(self.logger, if e == ChannelMonitorUpdateStatus::PermanentFailure { Level::Error } else { Level::Info },
-                                                               "Failed to update channel monitor with preimage {:?} immediately prior to force-close: {:?}",
-                                                               payment_preimage, e);
-                                               },
+               if let Some(mut peer_state_lock) = peer_state_opt.take() {
+                       let peer_state = &mut *peer_state_lock;
+                       if let hash_map::Entry::Occupied(mut chan) = peer_state.channel_by_id.entry(chan_id) {
+                               let counterparty_node_id = chan.get().get_counterparty_node_id();
+                               let fulfill_res = chan.get_mut().get_update_fulfill_htlc_and_commit(prev_hop.htlc_id, payment_preimage, &self.logger);
+
+                               if let UpdateFulfillCommitFetch::NewClaim { htlc_value_msat, monitor_update } = fulfill_res {
+                                       if let Some(action) = completion_action(Some(htlc_value_msat)) {
+                                               log_trace!(self.logger, "Tracking monitor update completion action for channel {}: {:?}",
+                                                       log_bytes!(chan_id), action);
+                                               peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action);
                                        }
-                                       let (drop, res) = convert_chan_err!(self, e, chan.get_mut(), &chan_id);
-                                       if drop {
-                                               chan.remove_entry();
+                                       let update_id = monitor_update.update_id;
+                                       let update_res = self.chain_monitor.update_channel(prev_hop.outpoint, monitor_update);
+                                       let res = handle_new_monitor_update!(self, update_res, update_id, peer_state_lock,
+                                               peer_state, chan);
+                                       if let Err(e) = res {
+                                               // TODO: This is a *critical* error - we probably updated the outbound edge
+                                               // of the HTLC's monitor with a preimage. We should retry this monitor
+                                               // update over and over again until morale improves.
+                                               log_error!(self.logger, "Failed to update channel monitor with preimage {:?}", payment_preimage);
+                                               return Err((counterparty_node_id, e));
                                        }
-                                       mem::drop(peer_state_opt);
-                                       mem::drop(per_peer_state);
-                                       self.handle_monitor_update_completion_actions(completion_action(None));
-                                       Err((counterparty_node_id, res))
-                               },
-                       }
-               } else {
-                       let preimage_update = ChannelMonitorUpdate {
-                               update_id: CLOSED_CHANNEL_UPDATE_ID,
-                               updates: vec![ChannelMonitorUpdateStep::PaymentPreimage {
-                                       payment_preimage,
-                               }],
-                       };
-                       // We update the ChannelMonitor on the backward link, after
-                       // receiving an `update_fulfill_htlc` from the forward link.
-                       let update_res = self.chain_monitor.update_channel(prev_hop.outpoint, &preimage_update);
-                       if update_res != ChannelMonitorUpdateStatus::Completed {
-                               // TODO: This needs to be handled somehow - if we receive a monitor update
-                               // with a preimage we *must* somehow manage to propagate it to the upstream
-                               // channel, or we must have an ability to receive the same event and try
-                               // again on restart.
-                               log_error!(self.logger, "Critical error: failed to update channel monitor with preimage {:?}: {:?}",
-                                       payment_preimage, update_res);
+                               }
+                               return Ok(());
                        }
-                       mem::drop(peer_state_opt);
-                       mem::drop(per_peer_state);
-                       // Note that we do process the completion action here. This totally could be a
-                       // duplicate claim, but we have no way of knowing without interrogating the
-                       // `ChannelMonitor` we've provided the above update to. Instead, note that `Event`s are
-                       // generally always allowed to be duplicative (and it's specifically noted in
-                       // `PaymentForwarded`).
-                       self.handle_monitor_update_completion_actions(completion_action(None));
-                       Ok(())
                }
+               let preimage_update = ChannelMonitorUpdate {
+                       update_id: CLOSED_CHANNEL_UPDATE_ID,
+                       updates: vec![ChannelMonitorUpdateStep::PaymentPreimage {
+                               payment_preimage,
+                       }],
+               };
+               // We update the ChannelMonitor on the backward link, after
+               // receiving an `update_fulfill_htlc` from the forward link.
+               let update_res = self.chain_monitor.update_channel(prev_hop.outpoint, &preimage_update);
+               if update_res != ChannelMonitorUpdateStatus::Completed {
+                       // TODO: This needs to be handled somehow - if we receive a monitor update
+                       // with a preimage we *must* somehow manage to propagate it to the upstream
+                       // channel, or we must have an ability to receive the same event and try
+                       // again on restart.
+                       log_error!(self.logger, "Critical error: failed to update channel monitor with preimage {:?}: {:?}",
+                               payment_preimage, update_res);
+               }
+               // Note that we do process the completion action here. This totally could be a
+               // duplicate claim, but we have no way of knowing without interrogating the
+               // `ChannelMonitor` we've provided the above update to. Instead, note that `Event`s are
+               // generally always allowed to be duplicative (and it's specifically noted in
+               // `PaymentForwarded`).
+               self.handle_monitor_update_completion_actions(completion_action(None));
+               Ok(())
        }
 
        fn finalize_claims(&self, sources: Vec<HTLCSource>) {
@@ -4132,6 +4099,14 @@ where
                pending_forwards: Vec<(PendingHTLCInfo, u64)>, funding_broadcastable: Option<Transaction>,
                channel_ready: Option<msgs::ChannelReady>, announcement_sigs: Option<msgs::AnnouncementSignatures>)
        -> Option<(u64, OutPoint, u128, Vec<(PendingHTLCInfo, u64)>)> {
+               log_trace!(self.logger, "Handling channel resumption for channel {} with {} RAA, {} commitment update, {} pending forwards, {}broadcasting funding, {} channel ready, {} announcement",
+                       log_bytes!(channel.channel_id()),
+                       if raa.is_some() { "an" } else { "no" },
+                       if commitment_update.is_some() { "a" } else { "no" }, pending_forwards.len(),
+                       if funding_broadcastable.is_some() { "" } else { "not " },
+                       if channel_ready.is_some() { "sending" } else { "without" },
+                       if announcement_sigs.is_some() { "sending" } else { "without" });
+
                let mut htlc_forwards = None;
 
                let counterparty_node_id = channel.get_counterparty_node_id();
@@ -4190,65 +4165,36 @@ where
        fn channel_monitor_updated(&self, funding_txo: &OutPoint, highest_applied_update_id: u64, counterparty_node_id: Option<&PublicKey>) {
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
 
-               let htlc_forwards;
-               let (mut pending_failures, finalized_claims, counterparty_node_id) = {
-                       let counterparty_node_id = match counterparty_node_id {
-                               Some(cp_id) => cp_id.clone(),
-                               None => {
-                                       // TODO: Once we can rely on the counterparty_node_id from the
-                                       // monitor event, this and the id_to_peer map should be removed.
-                                       let id_to_peer = self.id_to_peer.lock().unwrap();
-                                       match id_to_peer.get(&funding_txo.to_channel_id()) {
-                                               Some(cp_id) => cp_id.clone(),
-                                               None => return,
-                                       }
-                               }
-                       };
-                       let per_peer_state = self.per_peer_state.read().unwrap();
-                       let mut peer_state_lock;
-                       let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id);
-                       if peer_state_mutex_opt.is_none() { return }
-                       peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
-                       let peer_state = &mut *peer_state_lock;
-                       let mut channel = {
-                               match peer_state.channel_by_id.entry(funding_txo.to_channel_id()){
-                                       hash_map::Entry::Occupied(chan) => chan,
-                                       hash_map::Entry::Vacant(_) => return,
+               let counterparty_node_id = match counterparty_node_id {
+                       Some(cp_id) => cp_id.clone(),
+                       None => {
+                               // TODO: Once we can rely on the counterparty_node_id from the
+                               // monitor event, this and the id_to_peer map should be removed.
+                               let id_to_peer = self.id_to_peer.lock().unwrap();
+                               match id_to_peer.get(&funding_txo.to_channel_id()) {
+                                       Some(cp_id) => cp_id.clone(),
+                                       None => return,
                                }
-                       };
-                       if !channel.get().is_awaiting_monitor_update() || channel.get().get_latest_monitor_update_id() != highest_applied_update_id {
-                               return;
                        }
-
-                       let updates = channel.get_mut().monitor_updating_restored(&self.logger, &self.node_signer, self.genesis_hash, &self.default_configuration, self.best_block.read().unwrap().height());
-                       let channel_update = if updates.channel_ready.is_some() && channel.get().is_usable() {
-                               // We only send a channel_update in the case where we are just now sending a
-                               // channel_ready and the channel is in a usable state. We may re-send a
-                               // channel_update later through the announcement_signatures process for public
-                               // channels, but there's no reason not to just inform our counterparty of our fees
-                               // now.
-                               if let Ok(msg) = self.get_channel_update_for_unicast(channel.get()) {
-                                       Some(events::MessageSendEvent::SendChannelUpdate {
-                                               node_id: channel.get().get_counterparty_node_id(),
-                                               msg,
-                                       })
-                               } else { None }
-                       } else { None };
-                       htlc_forwards = self.handle_channel_resumption(&mut peer_state.pending_msg_events, channel.get_mut(), updates.raa, updates.commitment_update, updates.order, updates.accepted_htlcs, updates.funding_broadcastable, updates.channel_ready, updates.announcement_sigs);
-                       if let Some(upd) = channel_update {
-                               peer_state.pending_msg_events.push(upd);
+               };
+               let per_peer_state = self.per_peer_state.read().unwrap();
+               let mut peer_state_lock;
+               let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id);
+               if peer_state_mutex_opt.is_none() { return }
+               peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
+               let peer_state = &mut *peer_state_lock;
+               let mut channel = {
+                       match peer_state.channel_by_id.entry(funding_txo.to_channel_id()){
+                               hash_map::Entry::Occupied(chan) => chan,
+                               hash_map::Entry::Vacant(_) => return,
                        }
-
-                       (updates.failed_htlcs, updates.finalized_claimed_htlcs, counterparty_node_id)
                };
-               if let Some(forwards) = htlc_forwards {
-                       self.forward_htlcs(&mut [forwards][..]);
-               }
-               self.finalize_claims(finalized_claims);
-               for failure in pending_failures.drain(..) {
-                       let receiver = HTLCDestination::NextHopChannel { node_id: Some(counterparty_node_id), channel_id: funding_txo.to_channel_id() };
-                       self.fail_htlc_backwards_internal(&failure.0, &failure.1, &failure.2, receiver);
+               log_trace!(self.logger, "ChannelMonitor updated to {}. Current highest is {}",
+                       highest_applied_update_id, channel.get().get_latest_monitor_update_id());
+               if !channel.get().is_awaiting_monitor_update() || channel.get().get_latest_monitor_update_id() != highest_applied_update_id {
+                       return;
                }
+               handle_monitor_update_completion!(self, highest_applied_update_id, peer_state_lock, peer_state, channel.get_mut());
        }
 
        /// Accepts a request to open a channel after a [`Event::OpenChannelRequest`].
@@ -4506,60 +4452,31 @@ where
        }
 
        fn internal_funding_created(&self, counterparty_node_id: &PublicKey, msg: &msgs::FundingCreated) -> Result<(), MsgHandleErrInternal> {
+               let best_block = *self.best_block.read().unwrap();
+
                let per_peer_state = self.per_peer_state.read().unwrap();
                let peer_state_mutex = per_peer_state.get(counterparty_node_id)
                        .ok_or_else(|| {
                                debug_assert!(false);
                                MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.temporary_channel_id)
                        })?;
-               let ((funding_msg, monitor, mut channel_ready), mut chan) = {
-                       let best_block = *self.best_block.read().unwrap();
-                       let mut peer_state_lock = peer_state_mutex.lock().unwrap();
-                       let peer_state = &mut *peer_state_lock;
+
+               let mut peer_state_lock = peer_state_mutex.lock().unwrap();
+               let peer_state = &mut *peer_state_lock;
+               let ((funding_msg, monitor), chan) =
                        match peer_state.channel_by_id.entry(msg.temporary_channel_id) {
                                hash_map::Entry::Occupied(mut chan) => {
                                        (try_chan_entry!(self, chan.get_mut().funding_created(msg, best_block, &self.signer_provider, &self.logger), chan), chan.remove())
                                },
                                hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.temporary_channel_id))
-                       }
-               };
-               // Because we have exclusive ownership of the channel here we can release the peer_state
-               // lock before watch_channel
-               match self.chain_monitor.watch_channel(monitor.get_funding_txo().0, monitor) {
-                       ChannelMonitorUpdateStatus::Completed => {},
-                       ChannelMonitorUpdateStatus::PermanentFailure => {
-                               // Note that we reply with the new channel_id in error messages if we gave up on the
-                               // channel, not the temporary_channel_id. This is compatible with ourselves, but the
-                               // spec is somewhat ambiguous here. Not a huge deal since we'll send error messages for
-                               // any messages referencing a previously-closed channel anyway.
-                               // We do not propagate the monitor update to the user as it would be for a monitor
-                               // that we didn't manage to store (and that we don't care about - we don't respond
-                               // with the funding_signed so the channel can never go on chain).
-                               let (_monitor_update, failed_htlcs) = chan.force_shutdown(false);
-                               assert!(failed_htlcs.is_empty());
-                               return Err(MsgHandleErrInternal::send_err_msg_no_close("ChannelMonitor storage failure".to_owned(), funding_msg.channel_id));
-                       },
-                       ChannelMonitorUpdateStatus::InProgress => {
-                               // There's no problem signing a counterparty's funding transaction if our monitor
-                               // hasn't persisted to disk yet - we can't lose money on a transaction that we haven't
-                               // accepted payment from yet. We do, however, need to wait to send our channel_ready
-                               // until we have persisted our monitor.
-                               chan.monitor_updating_paused(false, false, channel_ready.is_some(), Vec::new(), Vec::new(), Vec::new());
-                               channel_ready = None; // Don't send the channel_ready now
-                       },
-               }
-               // It's safe to unwrap as we've held the `per_peer_state` read lock since checking that the
-               // peer exists, despite the inner PeerState potentially having no channels after removing
-               // the channel above.
-               let mut peer_state_lock = peer_state_mutex.lock().unwrap();
-               let peer_state = &mut *peer_state_lock;
+                       };
+
                match peer_state.channel_by_id.entry(funding_msg.channel_id) {
                        hash_map::Entry::Occupied(_) => {
-                               return Err(MsgHandleErrInternal::send_err_msg_no_close("Already had channel with the new channel_id".to_owned(), funding_msg.channel_id))
+                               Err(MsgHandleErrInternal::send_err_msg_no_close("Already had channel with the new channel_id".to_owned(), funding_msg.channel_id))
                        },
                        hash_map::Entry::Vacant(e) => {
-                               let mut id_to_peer = self.id_to_peer.lock().unwrap();
-                               match id_to_peer.entry(chan.channel_id()) {
+                               match self.id_to_peer.lock().unwrap().entry(chan.channel_id()) {
                                        hash_map::Entry::Occupied(_) => {
                                                return Err(MsgHandleErrInternal::send_err_msg_no_close(
                                                        "The funding_created message had the same funding_txid as an existing channel - funding is not possible".to_owned(),
@@ -4569,63 +4486,66 @@ where
                                                i_e.insert(chan.get_counterparty_node_id());
                                        }
                                }
+
+                               // There's no problem signing a counterparty's funding transaction if our monitor
+                               // hasn't persisted to disk yet - we can't lose money on a transaction that we haven't
+                               // accepted payment from yet. We do, however, need to wait to send our channel_ready
+                               // until we have persisted our monitor.
+                               let new_channel_id = funding_msg.channel_id;
                                peer_state.pending_msg_events.push(events::MessageSendEvent::SendFundingSigned {
                                        node_id: counterparty_node_id.clone(),
                                        msg: funding_msg,
                                });
-                               if let Some(msg) = channel_ready {
-                                       send_channel_ready!(self, peer_state.pending_msg_events, chan, msg);
+
+                               let monitor_res = self.chain_monitor.watch_channel(monitor.get_funding_txo().0, monitor);
+
+                               let chan = e.insert(chan);
+                               let mut res = handle_new_monitor_update!(self, monitor_res, 0, peer_state_lock, peer_state, chan, MANUALLY_REMOVING, { peer_state.channel_by_id.remove(&new_channel_id) });
+
+                               // Note that we reply with the new channel_id in error messages if we gave up on the
+                               // channel, not the temporary_channel_id. This is compatible with ourselves, but the
+                               // spec is somewhat ambiguous here. Not a huge deal since we'll send error messages for
+                               // any messages referencing a previously-closed channel anyway.
+                               // We do not propagate the monitor update to the user as it would be for a monitor
+                               // that we didn't manage to store (and that we don't care about - we don't respond
+                               // with the funding_signed so the channel can never go on chain).
+                               if let Err(MsgHandleErrInternal { shutdown_finish: Some((res, _)), .. }) = &mut res {
+                                       res.0 = None;
                                }
-                               e.insert(chan);
+                               res
                        }
                }
-               Ok(())
        }
 
        fn internal_funding_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::FundingSigned) -> Result<(), MsgHandleErrInternal> {
-               let funding_tx = {
-                       let best_block = *self.best_block.read().unwrap();
-                       let per_peer_state = self.per_peer_state.read().unwrap();
-                       let peer_state_mutex = per_peer_state.get(counterparty_node_id)
-                               .ok_or_else(|| {
-                                       debug_assert!(false);
-                                       MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)
-                               })?;
+               let best_block = *self.best_block.read().unwrap();
+               let per_peer_state = self.per_peer_state.read().unwrap();
+               let peer_state_mutex = per_peer_state.get(counterparty_node_id)
+                       .ok_or_else(|| {
+                               debug_assert!(false);
+                               MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)
+                       })?;
 
-                       let mut peer_state_lock = peer_state_mutex.lock().unwrap();
-                       let peer_state = &mut *peer_state_lock;
-                       match peer_state.channel_by_id.entry(msg.channel_id) {
-                               hash_map::Entry::Occupied(mut chan) => {
-                                       let (monitor, funding_tx, channel_ready) = match chan.get_mut().funding_signed(&msg, best_block, &self.signer_provider, &self.logger) {
-                                               Ok(update) => update,
-                                               Err(e) => try_chan_entry!(self, Err(e), chan),
-                                       };
-                                       match self.chain_monitor.watch_channel(chan.get().get_funding_txo().unwrap(), monitor) {
-                                               ChannelMonitorUpdateStatus::Completed => {},
-                                               e => {
-                                                       let mut res = handle_monitor_update_res!(self, e, chan, RAACommitmentOrder::RevokeAndACKFirst, channel_ready.is_some(), OPTIONALLY_RESEND_FUNDING_LOCKED);
-                                                       if let Err(MsgHandleErrInternal { ref mut shutdown_finish, .. }) = res {
-                                                               // We weren't able to watch the channel to begin with, so no updates should be made on
-                                                               // it. Previously, full_stack_target found an (unreachable) panic when the
-                                                               // monitor update contained within `shutdown_finish` was applied.
-                                                               if let Some((ref mut shutdown_finish, _)) = shutdown_finish {
-                                                                       shutdown_finish.0.take();
-                                                               }
-                                                       }
-                                                       return res
-                                               },
-                                       }
-                                       if let Some(msg) = channel_ready {
-                                               send_channel_ready!(self, peer_state.pending_msg_events, chan.get(), msg);
+               let mut peer_state_lock = peer_state_mutex.lock().unwrap();
+               let peer_state = &mut *peer_state_lock;
+               match peer_state.channel_by_id.entry(msg.channel_id) {
+                       hash_map::Entry::Occupied(mut chan) => {
+                               let monitor = try_chan_entry!(self,
+                                       chan.get_mut().funding_signed(&msg, best_block, &self.signer_provider, &self.logger), chan);
+                               let update_res = self.chain_monitor.watch_channel(chan.get().get_funding_txo().unwrap(), monitor);
+                               let mut res = handle_new_monitor_update!(self, update_res, 0, peer_state_lock, peer_state, chan);
+                               if let Err(MsgHandleErrInternal { ref mut shutdown_finish, .. }) = res {
+                                       // We weren't able to watch the channel to begin with, so no updates should be made on
+                                       // it. Previously, full_stack_target found an (unreachable) panic when the
+                                       // monitor update contained within `shutdown_finish` was applied.
+                                       if let Some((ref mut shutdown_finish, _)) = shutdown_finish {
+                                               shutdown_finish.0.take();
                                        }
-                                       funding_tx
-                               },
-                               hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id))
-                       }
-               };
-               log_info!(self.logger, "Broadcasting funding transaction with txid {}", funding_tx.txid());
-               self.tx_broadcaster.broadcast_transaction(&funding_tx);
-               Ok(())
+                               }
+                               res
+                       },
+                       hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id))
+               }
        }
 
        fn internal_channel_ready(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReady) -> Result<(), MsgHandleErrInternal> {
@@ -4690,27 +4610,27 @@ where
                                                        if chan_entry.get().sent_shutdown() { " after we initiated shutdown" } else { "" });
                                        }
 
-                                       let (shutdown, monitor_update, htlcs) = try_chan_entry!(self, chan_entry.get_mut().shutdown(&self.signer_provider, &peer_state.latest_features, &msg), chan_entry);
+                                       let funding_txo_opt = chan_entry.get().get_funding_txo();
+                                       let (shutdown, monitor_update_opt, htlcs) = try_chan_entry!(self,
+                                               chan_entry.get_mut().shutdown(&self.signer_provider, &peer_state.latest_features, &msg), chan_entry);
                                        dropped_htlcs = htlcs;
 
-                                       // Update the monitor with the shutdown script if necessary.
-                                       if let Some(monitor_update) = monitor_update {
-                                               let update_res = self.chain_monitor.update_channel(chan_entry.get().get_funding_txo().unwrap(), &monitor_update);
-                                               let (result, is_permanent) =
-                                                       handle_monitor_update_res!(self, update_res, chan_entry.get_mut(), RAACommitmentOrder::CommitmentFirst, chan_entry.key(), NO_UPDATE);
-                                               if is_permanent {
-                                                       remove_channel!(self, chan_entry);
-                                                       break result;
-                                               }
-                                       }
-
                                        if let Some(msg) = shutdown {
+                                               // We can send the `shutdown` message before updating the `ChannelMonitor`
+                                               // here as we don't need the monitor update to complete until we send a
+                                               // `shutdown_signed`, which we'll delay if we're pending a monitor update.
                                                peer_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown {
                                                        node_id: *counterparty_node_id,
                                                        msg,
                                                });
                                        }
 
+                                       // Update the monitor with the shutdown script if necessary.
+                                       if let Some(monitor_update) = monitor_update_opt {
+                                               let update_id = monitor_update.update_id;
+                                               let update_res = self.chain_monitor.update_channel(funding_txo_opt.unwrap(), monitor_update);
+                                               break handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, peer_state, chan_entry);
+                                       }
                                        break Ok(());
                                },
                                hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id))
@@ -4722,8 +4642,7 @@ where
                        self.fail_htlc_backwards_internal(&htlc_source.0, &htlc_source.1, &reason, receiver);
                }
 
-               let _ = handle_error!(self, result, *counterparty_node_id);
-               Ok(())
+               result
        }
 
        fn internal_closing_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::ClosingSigned) -> Result<(), MsgHandleErrInternal> {
@@ -4897,40 +4816,12 @@ where
                let peer_state = &mut *peer_state_lock;
                match peer_state.channel_by_id.entry(msg.channel_id) {
                        hash_map::Entry::Occupied(mut chan) => {
-                               let (revoke_and_ack, commitment_signed, monitor_update) =
-                                       match chan.get_mut().commitment_signed(&msg, &self.logger) {
-                                               Err((None, e)) => try_chan_entry!(self, Err(e), chan),
-                                               Err((Some(update), e)) => {
-                                                       assert!(chan.get().is_awaiting_monitor_update());
-                                                       let _ = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &update);
-                                                       try_chan_entry!(self, Err(e), chan);
-                                                       unreachable!();
-                                               },
-                                               Ok(res) => res
-                                       };
-                               let update_res = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &monitor_update);
-                               if let Err(e) = handle_monitor_update_res!(self, update_res, chan, RAACommitmentOrder::RevokeAndACKFirst, true, commitment_signed.is_some()) {
-                                       return Err(e);
-                               }
-
-                               peer_state.pending_msg_events.push(events::MessageSendEvent::SendRevokeAndACK {
-                                       node_id: counterparty_node_id.clone(),
-                                       msg: revoke_and_ack,
-                               });
-                               if let Some(msg) = commitment_signed {
-                                       peer_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
-                                               node_id: counterparty_node_id.clone(),
-                                               updates: msgs::CommitmentUpdate {
-                                                       update_add_htlcs: Vec::new(),
-                                                       update_fulfill_htlcs: Vec::new(),
-                                                       update_fail_htlcs: Vec::new(),
-                                                       update_fail_malformed_htlcs: Vec::new(),
-                                                       update_fee: None,
-                                                       commitment_signed: msg,
-                                               },
-                                       });
-                               }
-                               Ok(())
+                               let funding_txo = chan.get().get_funding_txo();
+                               let monitor_update = try_chan_entry!(self, chan.get_mut().commitment_signed(&msg, &self.logger), chan);
+                               let update_res = self.chain_monitor.update_channel(funding_txo.unwrap(), monitor_update);
+                               let update_id = monitor_update.update_id;
+                               handle_new_monitor_update!(self, update_res, update_id, peer_state_lock,
+                                       peer_state, chan)
                        },
                        hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id))
                }
@@ -5034,8 +4925,7 @@ where
        }
 
        fn internal_revoke_and_ack(&self, counterparty_node_id: &PublicKey, msg: &msgs::RevokeAndACK) -> Result<(), MsgHandleErrInternal> {
-               let mut htlcs_to_fail = Vec::new();
-               let res = loop {
+               let (htlcs_to_fail, res) = {
                        let per_peer_state = self.per_peer_state.read().unwrap();
                        let peer_state_mutex = per_peer_state.get(counterparty_node_id)
                                .ok_or_else(|| {
@@ -5046,59 +4936,19 @@ where
                        let peer_state = &mut *peer_state_lock;
                        match peer_state.channel_by_id.entry(msg.channel_id) {
                                hash_map::Entry::Occupied(mut chan) => {
-                                       let was_paused_for_mon_update = chan.get().is_awaiting_monitor_update();
-                                       let raa_updates = break_chan_entry!(self,
-                                               chan.get_mut().revoke_and_ack(&msg, &self.logger), chan);
-                                       htlcs_to_fail = raa_updates.holding_cell_failed_htlcs;
-                                       let update_res = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &raa_updates.monitor_update);
-                                       if was_paused_for_mon_update {
-                                               assert!(update_res != ChannelMonitorUpdateStatus::Completed);
-                                               assert!(raa_updates.commitment_update.is_none());
-                                               assert!(raa_updates.accepted_htlcs.is_empty());
-                                               assert!(raa_updates.failed_htlcs.is_empty());
-                                               assert!(raa_updates.finalized_claimed_htlcs.is_empty());
-                                               break Err(MsgHandleErrInternal::ignore_no_close("Existing pending monitor update prevented responses to RAA".to_owned()));
-                                       }
-                                       if update_res != ChannelMonitorUpdateStatus::Completed {
-                                               if let Err(e) = handle_monitor_update_res!(self, update_res, chan,
-                                                               RAACommitmentOrder::CommitmentFirst, false,
-                                                               raa_updates.commitment_update.is_some(), false,
-                                                               raa_updates.accepted_htlcs, raa_updates.failed_htlcs,
-                                                               raa_updates.finalized_claimed_htlcs) {
-                                                       break Err(e);
-                                               } else { unreachable!(); }
-                                       }
-                                       if let Some(updates) = raa_updates.commitment_update {
-                                               peer_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
-                                                       node_id: counterparty_node_id.clone(),
-                                                       updates,
-                                               });
-                                       }
-                                       break Ok((raa_updates.accepted_htlcs, raa_updates.failed_htlcs,
-                                                       raa_updates.finalized_claimed_htlcs,
-                                                       chan.get().get_short_channel_id()
-                                                               .unwrap_or(chan.get().outbound_scid_alias()),
-                                                       chan.get().get_funding_txo().unwrap(),
-                                                       chan.get().get_user_id()))
+                                       let funding_txo = chan.get().get_funding_txo();
+                                       let (htlcs_to_fail, monitor_update) = try_chan_entry!(self, chan.get_mut().revoke_and_ack(&msg, &self.logger), chan);
+                                       let update_res = self.chain_monitor.update_channel(funding_txo.unwrap(), monitor_update);
+                                       let update_id = monitor_update.update_id;
+                                       let res = handle_new_monitor_update!(self, update_res, update_id, peer_state_lock,
+                                               peer_state, chan);
+                                       (htlcs_to_fail, res)
                                },
-                               hash_map::Entry::Vacant(_) => break Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id))
+                               hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id))
                        }
                };
                self.fail_holding_cell_htlcs(htlcs_to_fail, msg.channel_id, counterparty_node_id);
-               match res {
-                       Ok((pending_forwards, mut pending_failures, finalized_claim_htlcs,
-                               short_channel_id, channel_outpoint, user_channel_id)) =>
-                       {
-                               for failure in pending_failures.drain(..) {
-                                       let receiver = HTLCDestination::NextHopChannel { node_id: Some(*counterparty_node_id), channel_id: channel_outpoint.to_channel_id() };
-                                       self.fail_htlc_backwards_internal(&failure.0, &failure.1, &failure.2, receiver);
-                               }
-                               self.forward_htlcs(&mut [(short_channel_id, channel_outpoint, user_channel_id, pending_forwards)]);
-                               self.finalize_claims(finalized_claim_htlcs);
-                               Ok(())
-                       },
-                       Err(e) => Err(e)
-               }
+               res
        }
 
        fn internal_update_fee(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFee) -> Result<(), MsgHandleErrInternal> {
@@ -5340,49 +5190,37 @@ where
                let mut has_monitor_update = false;
                let mut failed_htlcs = Vec::new();
                let mut handle_errors = Vec::new();
-               {
-                       let per_peer_state = self.per_peer_state.read().unwrap();
+               let per_peer_state = self.per_peer_state.read().unwrap();
 
-                       for (_cp_id, peer_state_mutex) in per_peer_state.iter() {
+               for (_cp_id, peer_state_mutex) in per_peer_state.iter() {
+                       'chan_loop: loop {
                                let mut peer_state_lock = peer_state_mutex.lock().unwrap();
-                               let peer_state = &mut *peer_state_lock;
-                               let pending_msg_events = &mut peer_state.pending_msg_events;
-                               peer_state.channel_by_id.retain(|channel_id, chan| {
-                                       match chan.maybe_free_holding_cell_htlcs(&self.logger) {
-                                               Ok((commitment_opt, holding_cell_failed_htlcs)) => {
-                                                       if !holding_cell_failed_htlcs.is_empty() {
-                                                               failed_htlcs.push((
-                                                                       holding_cell_failed_htlcs,
-                                                                       *channel_id,
-                                                                       chan.get_counterparty_node_id()
-                                                               ));
-                                                       }
-                                                       if let Some((commitment_update, monitor_update)) = commitment_opt {
-                                                               match self.chain_monitor.update_channel(chan.get_funding_txo().unwrap(), &monitor_update) {
-                                                                       ChannelMonitorUpdateStatus::Completed => {
-                                                                               pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
-                                                                                       node_id: chan.get_counterparty_node_id(),
-                                                                                       updates: commitment_update,
-                                                                               });
-                                                                       },
-                                                                       e => {
-                                                                               has_monitor_update = true;
-                                                                               let (res, close_channel) = handle_monitor_update_res!(self, e, chan, RAACommitmentOrder::CommitmentFirst, channel_id, COMMITMENT_UPDATE_ONLY);
-                                                                               handle_errors.push((chan.get_counterparty_node_id(), res));
-                                                                               if close_channel { return false; }
-                                                                       },
-                                                               }
-                                                       }
-                                                       true
-                                               },
-                                               Err(e) => {
-                                                       let (close_channel, res) = convert_chan_err!(self, e, chan, channel_id);
-                                                       handle_errors.push((chan.get_counterparty_node_id(), Err(res)));
-                                                       // ChannelClosed event is generated by handle_error for us
-                                                       !close_channel
+                               let peer_state: &mut PeerState<_> = &mut *peer_state_lock;
+                               for (channel_id, chan) in peer_state.channel_by_id.iter_mut() {
+                                       let counterparty_node_id = chan.get_counterparty_node_id();
+                                       let funding_txo = chan.get_funding_txo();
+                                       let (monitor_opt, holding_cell_failed_htlcs) =
+                                               chan.maybe_free_holding_cell_htlcs(&self.logger);
+                                       if !holding_cell_failed_htlcs.is_empty() {
+                                               failed_htlcs.push((holding_cell_failed_htlcs, *channel_id, counterparty_node_id));
+                                       }
+                                       if let Some(monitor_update) = monitor_opt {
+                                               has_monitor_update = true;
+
+                                               let update_res = self.chain_monitor.update_channel(
+                                                       funding_txo.expect("channel is live"), monitor_update);
+                                               let update_id = monitor_update.update_id;
+                                               let channel_id: [u8; 32] = *channel_id;
+                                               let res = handle_new_monitor_update!(self, update_res, update_id,
+                                                       peer_state_lock, peer_state, chan, MANUALLY_REMOVING,
+                                                       peer_state.channel_by_id.remove(&channel_id));
+                                               if res.is_err() {
+                                                       handle_errors.push((counterparty_node_id, res));
                                                }
+                                               continue 'chan_loop;
                                        }
-                               });
+                               }
+                               break 'chan_loop;
                        }
                }
 
@@ -6432,6 +6270,7 @@ where
                                                channel_by_id: HashMap::new(),
                                                latest_features: init_msg.features.clone(),
                                                pending_msg_events: Vec::new(),
+                                               monitor_update_blocked_actions: BTreeMap::new(),
                                                is_connected: true,
                                        }));
                                },
@@ -7069,10 +6908,14 @@ where
                        htlc_purposes.push(purpose);
                }
 
+               let mut monitor_update_blocked_actions_per_peer = None;
+               let mut peer_states = Vec::new();
+               for (_, peer_state_mutex) in per_peer_state.iter() {
+                       peer_states.push(peer_state_mutex.lock().unwrap());
+               }
+
                (serializable_peer_count).write(writer)?;
-               for (peer_pubkey, peer_state_mutex) in per_peer_state.iter() {
-                       let peer_state_lock = peer_state_mutex.lock().unwrap();
-                       let peer_state = &*peer_state_lock;
+               for ((peer_pubkey, _), peer_state) in per_peer_state.iter().zip(peer_states.iter()) {
                        // Peers which we have no channels to should be dropped once disconnected. As we
                        // disconnect all peers when shutting down and serializing the ChannelManager, we
                        // consider all peers as disconnected here. There's therefore no need write peers with
@@ -7080,6 +6923,11 @@ where
                        if !peer_state.ok_to_remove(false) {
                                peer_pubkey.write(writer)?;
                                peer_state.latest_features.write(writer)?;
+                               if !peer_state.monitor_update_blocked_actions.is_empty() {
+                                       monitor_update_blocked_actions_per_peer
+                                               .get_or_insert_with(Vec::new)
+                                               .push((*peer_pubkey, &peer_state.monitor_update_blocked_actions));
+                               }
                        }
                }
 
@@ -7157,8 +7005,6 @@ where
                        // LDK versions prior to 0.0.113 do not know how to read the pending claimed payments
                        // map. Thus, if there are no entries we skip writing a TLV for it.
                        pending_claiming_payments = None;
-               } else {
-                       debug_assert!(false, "While we have code to serialize pending_claiming_payments, the map should always be empty until a later PR");
                }
 
                write_tlv_fields!(writer, {
@@ -7167,6 +7013,7 @@ where
                        (3, pending_outbound_payments, required),
                        (4, pending_claiming_payments, option),
                        (5, self.our_network_pubkey, required),
+                       (6, monitor_update_blocked_actions_per_peer, option),
                        (7, self.fake_scid_rand_bytes, required),
                        (9, htlc_purposes, vec_type),
                        (11, self.probing_cookie_secret, required),
@@ -7479,6 +7326,7 @@ where
                                channel_by_id: peer_channels.remove(&peer_pubkey).unwrap_or(HashMap::new()),
                                latest_features: Readable::read(reader)?,
                                pending_msg_events: Vec::new(),
+                               monitor_update_blocked_actions: BTreeMap::new(),
                                is_connected: false,
                        };
                        per_peer_state.insert(peer_pubkey, Mutex::new(peer_state));
@@ -7535,12 +7383,14 @@ where
                let mut probing_cookie_secret: Option<[u8; 32]> = None;
                let mut claimable_htlc_purposes = None;
                let mut pending_claiming_payments = Some(HashMap::new());
+               let mut monitor_update_blocked_actions_per_peer = Some(Vec::new());
                read_tlv_fields!(reader, {
                        (1, pending_outbound_payments_no_retry, option),
                        (2, pending_intercepted_htlcs, option),
                        (3, pending_outbound_payments, option),
                        (4, pending_claiming_payments, option),
                        (5, received_network_pubkey, option),
+                       (6, monitor_update_blocked_actions_per_peer, option),
                        (7, fake_scid_rand_bytes, option),
                        (9, claimable_htlc_purposes, vec_type),
                        (11, probing_cookie_secret, option),
@@ -7807,6 +7657,15 @@ where
                        }
                }
 
+               for (node_id, monitor_update_blocked_actions) in monitor_update_blocked_actions_per_peer.unwrap() {
+                       if let Some(peer_state) = per_peer_state.get_mut(&node_id) {
+                               peer_state.lock().unwrap().monitor_update_blocked_actions = monitor_update_blocked_actions;
+                       } else {
+                               log_error!(args.logger, "Got blocked actions without a per-peer-state for {}", node_id);
+                               return Err(DecodeError::InvalidValue);
+                       }
+               }
+
                let channel_manager = ChannelManager {
                        genesis_hash,
                        fee_estimator: bounded_fee_estimator,
index 65f27da13559ac1d4c067aafc694540894acdc07..d6d2972d0736e3ec642fccee7363e44329560584 100644 (file)
@@ -3618,22 +3618,22 @@ fn test_simple_peer_disconnect() {
                        _ => panic!("Unexpected event"),
                }
                match events[1] {
+                       Event::PaymentPathSuccessful { .. } => {},
+                       _ => panic!("Unexpected event"),
+               }
+               match events[2] {
                        Event::PaymentPathFailed { payment_hash, payment_failed_permanently, .. } => {
                                assert_eq!(payment_hash, payment_hash_5);
                                assert!(payment_failed_permanently);
                        },
                        _ => panic!("Unexpected event"),
                }
-               match events[2] {
+               match events[3] {
                        Event::PaymentFailed { payment_hash, .. } => {
                                assert_eq!(payment_hash, payment_hash_5);
                        },
                        _ => panic!("Unexpected event"),
                }
-               match events[3] {
-                       Event::PaymentPathSuccessful { .. } => {},
-                       _ => panic!("Unexpected event"),
-               }
        }
 
        claim_payment(&nodes[0], &vec!(&nodes[1], &nodes[2]), payment_preimage_4);
@@ -8186,7 +8186,7 @@ fn test_update_err_monitor_lockdown() {
                let mut node_0_per_peer_lock;
                let mut node_0_peer_state_lock;
                let mut channel = get_channel_ref!(nodes[0], nodes[1], node_0_per_peer_lock, node_0_peer_state_lock, chan_1.2);
-               if let Ok((_, _, update)) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].logger) {
+               if let Ok(update) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].logger) {
                        assert_eq!(watchtower.chain_monitor.update_channel(outpoint, &update), ChannelMonitorUpdateStatus::PermanentFailure);
                        assert_eq!(nodes[0].chain_monitor.update_channel(outpoint, &update), ChannelMonitorUpdateStatus::Completed);
                } else { assert!(false); }
@@ -8280,7 +8280,7 @@ fn test_concurrent_monitor_claim() {
                let mut node_0_per_peer_lock;
                let mut node_0_peer_state_lock;
                let mut channel = get_channel_ref!(nodes[0], nodes[1], node_0_per_peer_lock, node_0_peer_state_lock, chan_1.2);
-               if let Ok((_, _, update)) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].logger) {
+               if let Ok(update) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].logger) {
                        // Watchtower Alice should already have seen the block and reject the update
                        assert_eq!(watchtower_alice.chain_monitor.update_channel(outpoint, &update), ChannelMonitorUpdateStatus::PermanentFailure);
                        assert_eq!(watchtower_bob.chain_monitor.update_channel(outpoint, &update), ChannelMonitorUpdateStatus::Completed);
@@ -8736,9 +8736,9 @@ fn test_duplicate_chan_id() {
        };
        check_added_monitors!(nodes[0], 0);
        nodes[1].node.handle_funding_created(&nodes[0].node.get_our_node_id(), &funding_created);
-       // At this point we'll try to add a duplicate channel monitor, which will be rejected, but
-       // still needs to be cleared here.
-       check_added_monitors!(nodes[1], 1);
+       // At this point we'll look up if the channel_id is present and immediately fail the channel
+       // without trying to persist the `ChannelMonitor`.
+       check_added_monitors!(nodes[1], 0);
 
        // ...still, nodes[1] will reject the duplicate channel.
        {
index 249051c40ba337620aad601d7b09e232dab34b79..96a04f417d031978c1a7f6116cf04e133c225d11 100644 (file)
@@ -258,7 +258,7 @@ fn no_pending_leak_on_initial_send_failure() {
 
        unwrap_send_err!(nodes[0].node.send_payment(&route, payment_hash, &Some(payment_secret), PaymentId(payment_hash.0)),
                true, APIError::ChannelUnavailable { ref err },
-               assert_eq!(err, "Peer for first hop currently disconnected/pending monitor update!"));
+               assert_eq!(err, "Peer for first hop currently disconnected"));
 
        assert!(!nodes[0].node.has_pending_payments());
 }
index 928cc61946e46f358a70e82672b06bda1dec4867..847005e324e60f5a0e2e42d67e31009571b27e2b 100644 (file)
@@ -770,6 +770,7 @@ impl Readable for Vec<u8> {
 }
 
 impl_for_vec!(ecdsa::Signature);
+impl_for_vec!(crate::ln::channelmanager::MonitorUpdateCompletionAction);
 impl_for_vec!((A, B), A, B);
 
 impl Writeable for Script {