Extend MsgHandleErrInternal with a new chan_id field Option<[u8; 32]>
[rust-lightning] / lightning / src / ln / channelmanager.rs
index 888724c0f064a82f885203ef0f7a0d76b7f885e8..dccca13ff978bb59f17d45f04c0a3dd5eece23ad 100644 (file)
@@ -37,13 +37,12 @@ use bitcoin::secp256k1;
 
 use chain;
 use chain::{Confirm, Watch, BestBlock};
-use chain::chaininterface::{BroadcasterInterface, FeeEstimator};
+use chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator};
 use chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateStep, ChannelMonitorUpdateErr, HTLC_FAIL_BACK_BUFFER, CLTV_CLAIM_BUFFER, LATENCY_GRACE_PERIOD_BLOCKS, ANTI_REORG_DELAY, MonitorEvent, CLOSED_CHANNEL_UPDATE_ID};
 use chain::transaction::{OutPoint, TransactionData};
 // Since this struct is returned in `list_channels` methods, expose it here in case users want to
 // construct one themselves.
 use ln::{PaymentHash, PaymentPreimage, PaymentSecret};
-pub use ln::channel::CounterpartyForwardingInfo;
 use ln::channel::{Channel, ChannelError, ChannelUpdateStatus, UpdateFulfillCommitFetch};
 use ln::features::{InitFeatures, NodeFeatures};
 use routing::router::{Route, RouteHop};
@@ -60,17 +59,17 @@ use util::chacha20::{ChaCha20, ChaChaReader};
 use util::logger::{Logger, Level};
 use util::errors::APIError;
 
+use io;
 use prelude::*;
 use core::{cmp, mem};
 use core::cell::RefCell;
-use std::io::{Cursor, Read};
+use io::{Cursor, Read};
 use sync::{Arc, Condvar, Mutex, MutexGuard, RwLock, RwLockReadGuard};
 use core::sync::atomic::{AtomicUsize, Ordering};
 use core::time::Duration;
 #[cfg(any(test, feature = "allow_wallclock_use"))]
 use std::time::Instant;
 use core::ops::Deref;
-use bitcoin::hashes::hex::ToHex;
 
 // We hold various information about HTLC relay in the HTLC objects in Channel itself:
 //
@@ -173,6 +172,22 @@ struct ClaimableHTLC {
        onion_payload: OnionPayload,
 }
 
+/// A payment identifier used to correlate an MPP payment's per-path HTLC sources internally.
+#[derive(Hash, Copy, Clone, PartialEq, Eq, Debug)]
+pub(crate) struct MppId(pub [u8; 32]);
+
+impl Writeable for MppId {
+       fn write<W: Writer>(&self, w: &mut W) -> Result<(), io::Error> {
+               self.0.write(w)
+       }
+}
+
+impl Readable for MppId {
+       fn read<R: Read>(r: &mut R) -> Result<Self, DecodeError> {
+               let buf: [u8; 32] = Readable::read(r)?;
+               Ok(MppId(buf))
+       }
+}
 /// Tracks the inbound corresponding to an outbound HTLC
 #[derive(Clone, PartialEq)]
 pub(crate) enum HTLCSource {
@@ -183,6 +198,7 @@ pub(crate) enum HTLCSource {
                /// Technically we can recalculate this from the route, but we cache it here to avoid
                /// doing a double-pass on route when we get a failure back
                first_hop_htlc_msat: u64,
+               mpp_id: MppId,
        },
 }
 #[cfg(test)]
@@ -192,6 +208,7 @@ impl HTLCSource {
                        path: Vec::new(),
                        session_priv: SecretKey::from_slice(&[1; 32]).unwrap(),
                        first_hop_htlc_msat: 0,
+                       mpp_id: MppId([2; 32]),
                }
        }
 }
@@ -225,6 +242,7 @@ type ShutdownResult = (Option<(OutPoint, ChannelMonitorUpdate)>, Vec<(HTLCSource
 
 struct MsgHandleErrInternal {
        err: msgs::LightningError,
+       chan_id: Option<[u8; 32]>,
        shutdown_finish: Option<(ShutdownResult, Option<msgs::ChannelUpdate>)>,
 }
 impl MsgHandleErrInternal {
@@ -240,6 +258,7 @@ impl MsgHandleErrInternal {
                                        },
                                },
                        },
+                       chan_id: Some(channel_id),
                        shutdown_finish: None,
                }
        }
@@ -250,12 +269,13 @@ impl MsgHandleErrInternal {
                                err,
                                action: msgs::ErrorAction::IgnoreError,
                        },
+                       chan_id: None,
                        shutdown_finish: None,
                }
        }
        #[inline]
        fn from_no_close(err: msgs::LightningError) -> Self {
-               Self { err, shutdown_finish: None }
+               Self { err, chan_id: None, shutdown_finish: None }
        }
        #[inline]
        fn from_finish_shutdown(err: String, channel_id: [u8; 32], shutdown_res: ShutdownResult, channel_update: Option<msgs::ChannelUpdate>) -> Self {
@@ -269,6 +289,7 @@ impl MsgHandleErrInternal {
                                        },
                                },
                        },
+                       chan_id: Some(channel_id),
                        shutdown_finish: Some((shutdown_res, channel_update)),
                }
        }
@@ -276,6 +297,10 @@ impl MsgHandleErrInternal {
        fn from_chan_no_close(err: ChannelError, channel_id: [u8; 32]) -> Self {
                Self {
                        err: match err {
+                               ChannelError::Warn(msg) =>  LightningError {
+                                       err: msg,
+                                       action: msgs::ErrorAction::IgnoreError,
+                               },
                                ChannelError::Ignore(msg) => LightningError {
                                        err: msg,
                                        action: msgs::ErrorAction::IgnoreError,
@@ -299,6 +324,7 @@ impl MsgHandleErrInternal {
                                        },
                                },
                        },
+                       chan_id: Some(channel_id),
                        shutdown_finish: None,
                }
        }
@@ -469,8 +495,11 @@ pub struct ChannelManager<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref,
        /// which may generate a claim event, we may receive similar duplicate claim/fail MonitorEvents
        /// after reloading from disk while replaying blocks against ChannelMonitors.
        ///
+       /// Each payment has each of its MPP part's session_priv bytes in the HashSet of the map (even
+       /// payments over a single path).
+       ///
        /// Locked *after* channel_state.
-       pending_outbound_payments: Mutex<HashSet<[u8; 32]>>,
+       pending_outbound_payments: Mutex<HashMap<MppId, HashSet<[u8; 32]>>>,
 
        our_network_key: SecretKey,
        our_network_pubkey: PublicKey,
@@ -490,6 +519,8 @@ pub struct ChannelManager<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref,
        /// Because adding or removing an entry is rare, we usually take an outer read lock and then
        /// operate on the inner value freely. Sadly, this prevents parallel operation when opening a
        /// new channel.
+       ///
+       /// If also holding `channel_state` lock, must lock `channel_state` prior to `per_peer_state`.
        per_peer_state: RwLock<HashMap<PublicKey, Mutex<PeerState>>>,
 
        pending_events: Mutex<Vec<events::Event>>,
@@ -620,6 +651,19 @@ const CHECK_CLTV_EXPIRY_SANITY: u32 = MIN_CLTV_EXPIRY_DELTA as u32 - LATENCY_GRA
 #[allow(dead_code)]
 const CHECK_CLTV_EXPIRY_SANITY_2: u32 = MIN_CLTV_EXPIRY_DELTA as u32 - LATENCY_GRACE_PERIOD_BLOCKS - 2*CLTV_CLAIM_BUFFER;
 
+/// Information needed for constructing an invoice route hint for this channel.
+#[derive(Clone, Debug, PartialEq)]
+pub struct CounterpartyForwardingInfo {
+       /// Base routing fee in millisatoshis.
+       pub fee_base_msat: u32,
+       /// Amount in millionths of a satoshi the channel will charge per transferred satoshi.
+       pub fee_proportional_millionths: u32,
+       /// The minimum difference in cltv_expiry between an ingoing HTLC and its outgoing counterpart,
+       /// such that the outgoing HTLC is forwardable to this counterparty. See `msgs::ChannelUpdate`'s
+       /// `cltv_expiry_delta` for more details.
+       pub cltv_expiry_delta: u16,
+}
+
 /// Channel parameters which apply to our counterparty. These are split out from [`ChannelDetails`]
 /// to better separate parameters.
 #[derive(Clone, Debug, PartialEq)]
@@ -774,7 +818,7 @@ macro_rules! handle_error {
        ($self: ident, $internal: expr, $counterparty_node_id: expr) => {
                match $internal {
                        Ok(msg) => Ok(msg),
-                       Err(MsgHandleErrInternal { err, shutdown_finish }) => {
+                       Err(MsgHandleErrInternal { err, chan_id, shutdown_finish }) => {
                                #[cfg(debug_assertions)]
                                {
                                        // In testing, ensure there are no deadlocks where the lock is already held upon
@@ -817,6 +861,11 @@ macro_rules! handle_error {
 macro_rules! convert_chan_err {
        ($self: ident, $err: expr, $short_to_id: expr, $channel: expr, $channel_id: expr) => {
                match $err {
+                       ChannelError::Warn(msg) => {
+                               //TODO: Once warning messages are merged, we should send a `warning` message to our
+                               //peer here.
+                               (false, MsgHandleErrInternal::from_chan_no_close(ChannelError::Ignore(msg), $channel_id.clone()))
+                       },
                        ChannelError::Ignore(msg) => {
                                (false, MsgHandleErrInternal::from_chan_no_close(ChannelError::Ignore(msg), $channel_id.clone()))
                        },
@@ -870,6 +919,18 @@ macro_rules! try_chan_entry {
        }
 }
 
+macro_rules! remove_channel {
+       ($channel_state: expr, $entry: expr) => {
+               {
+                       let channel = $entry.remove_entry().1;
+                       if let Some(short_id) = channel.get_short_channel_id() {
+                               $channel_state.short_to_id.remove(&short_id);
+                       }
+                       channel
+               }
+       }
+}
+
 macro_rules! handle_monitor_err {
        ($self: ident, $err: expr, $channel_state: expr, $entry: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr) => {
                handle_monitor_err!($self, $err, $channel_state, $entry, $action_type, $resend_raa, $resend_commitment, Vec::new(), Vec::new())
@@ -1115,7 +1176,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                pending_msg_events: Vec::new(),
                        }),
                        pending_inbound_payments: Mutex::new(HashMap::new()),
-                       pending_outbound_payments: Mutex::new(HashSet::new()),
+                       pending_outbound_payments: Mutex::new(HashMap::new()),
 
                        our_network_key: keys_manager.get_node_secret(),
                        our_network_pubkey: PublicKey::from_secret_key(&secp_ctx, &keys_manager.get_node_secret()),
@@ -1164,8 +1225,18 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                        return Err(APIError::APIMisuseError { err: format!("Channel value must be at least 1000 satoshis. It was {}", channel_value_satoshis) });
                }
 
-               let config = if override_config.is_some() { override_config.as_ref().unwrap() } else { &self.default_configuration };
-               let channel = Channel::new_outbound(&self.fee_estimator, &self.keys_manager, their_network_key, channel_value_satoshis, push_msat, user_id, config)?;
+               let channel = {
+                       let per_peer_state = self.per_peer_state.read().unwrap();
+                       match per_peer_state.get(&their_network_key) {
+                               Some(peer_state) => {
+                                       let peer_state = peer_state.lock().unwrap();
+                                       let their_features = &peer_state.latest_features;
+                                       let config = if override_config.is_some() { override_config.as_ref().unwrap() } else { &self.default_configuration };
+                                       Channel::new_outbound(&self.fee_estimator, &self.keys_manager, their_network_key, their_features, channel_value_satoshis, push_msat, user_id, config)?
+                               },
+                               None => return Err(APIError::ChannelUnavailable { err: format!("Not connected to node: {}", their_network_key) }),
+                       }
+               };
                let res = channel.get_open_channel(self.genesis_hash.clone());
 
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
@@ -1251,51 +1322,111 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                self.list_channels_with_filter(|&(_, ref channel)| channel.is_live())
        }
 
-       /// Begins the process of closing a channel. After this call (plus some timeout), no new HTLCs
-       /// will be accepted on the given channel, and after additional timeout/the closing of all
-       /// pending HTLCs, the channel will be closed on chain.
-       ///
-       /// May generate a SendShutdown message event on success, which should be relayed.
-       pub fn close_channel(&self, channel_id: &[u8; 32]) -> Result<(), APIError> {
+       fn close_channel_internal(&self, channel_id: &[u8; 32], target_feerate_sats_per_1000_weight: Option<u32>) -> Result<(), APIError> {
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
 
-               let (mut failed_htlcs, chan_option) = {
+               let counterparty_node_id;
+               let mut failed_htlcs: Vec<(HTLCSource, PaymentHash)>;
+               let result: Result<(), _> = loop {
                        let mut channel_state_lock = self.channel_state.lock().unwrap();
                        let channel_state = &mut *channel_state_lock;
                        match channel_state.by_id.entry(channel_id.clone()) {
                                hash_map::Entry::Occupied(mut chan_entry) => {
-                                       let (shutdown_msg, failed_htlcs) = chan_entry.get_mut().get_shutdown()?;
+                                       counterparty_node_id = chan_entry.get().get_counterparty_node_id();
+                                       let per_peer_state = self.per_peer_state.read().unwrap();
+                                       let (shutdown_msg, monitor_update, htlcs) = match per_peer_state.get(&counterparty_node_id) {
+                                               Some(peer_state) => {
+                                                       let peer_state = peer_state.lock().unwrap();
+                                                       let their_features = &peer_state.latest_features;
+                                                       chan_entry.get_mut().get_shutdown(&self.keys_manager, their_features, target_feerate_sats_per_1000_weight)?
+                                               },
+                                               None => return Err(APIError::ChannelUnavailable { err: format!("Not connected to node: {}", counterparty_node_id) }),
+                                       };
+                                       failed_htlcs = htlcs;
+
+                                       // Update the monitor with the shutdown script if necessary.
+                                       if let Some(monitor_update) = monitor_update {
+                                               if let Err(e) = self.chain_monitor.update_channel(chan_entry.get().get_funding_txo().unwrap(), monitor_update) {
+                                                       let (result, is_permanent) =
+                                                               handle_monitor_err!(self, e, channel_state.short_to_id, chan_entry.get_mut(), RAACommitmentOrder::CommitmentFirst, false, false, Vec::new(), Vec::new(), chan_entry.key());
+                                                       if is_permanent {
+                                                               remove_channel!(channel_state, chan_entry);
+                                                               break result;
+                                                       }
+                                               }
+                                       }
+
                                        channel_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown {
-                                               node_id: chan_entry.get().get_counterparty_node_id(),
+                                               node_id: counterparty_node_id,
                                                msg: shutdown_msg
                                        });
+
                                        if chan_entry.get().is_shutdown() {
-                                               if let Some(short_id) = chan_entry.get().get_short_channel_id() {
-                                                       channel_state.short_to_id.remove(&short_id);
+                                               let channel = remove_channel!(channel_state, chan_entry);
+                                               if let Ok(channel_update) = self.get_channel_update_for_broadcast(&channel) {
+                                                       channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
+                                                               msg: channel_update
+                                                       });
                                                }
-                                               (failed_htlcs, Some(chan_entry.remove_entry().1))
-                                       } else { (failed_htlcs, None) }
+                                       }
+                                       break Ok(());
                                },
                                hash_map::Entry::Vacant(_) => return Err(APIError::ChannelUnavailable{err: "No such channel".to_owned()})
                        }
                };
+
                for htlc_source in failed_htlcs.drain(..) {
                        self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_source.0, &htlc_source.1, HTLCFailReason::Reason { failure_code: 0x4000 | 8, data: Vec::new() });
                }
-               let chan_update = if let Some(chan) = chan_option {
-                       self.get_channel_update_for_broadcast(&chan).ok()
-               } else { None };
-
-               if let Some(update) = chan_update {
-                       let mut channel_state = self.channel_state.lock().unwrap();
-                       channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
-                               msg: update
-                       });
-               }
 
+               let _ = handle_error!(self, result, counterparty_node_id);
                Ok(())
        }
 
+       /// Begins the process of closing a channel. After this call (plus some timeout), no new HTLCs
+       /// will be accepted on the given channel, and after additional timeout/the closing of all
+       /// pending HTLCs, the channel will be closed on chain.
+       ///
+       ///  * If we are the channel initiator, we will pay between our [`Background`] and
+       ///    [`ChannelConfig::force_close_avoidance_max_fee_satoshis`] plus our [`Normal`] fee
+       ///    estimate.
+       ///  * If our counterparty is the channel initiator, we will require a channel closing
+       ///    transaction feerate of at least our [`Background`] feerate or the feerate which
+       ///    would appear on a force-closure transaction, whichever is lower. We will allow our
+       ///    counterparty to pay as much fee as they'd like, however.
+       ///
+       /// May generate a SendShutdown message event on success, which should be relayed.
+       ///
+       /// [`ChannelConfig::force_close_avoidance_max_fee_satoshis`]: crate::util::config::ChannelConfig::force_close_avoidance_max_fee_satoshis
+       /// [`Background`]: crate::chain::chaininterface::ConfirmationTarget::Background
+       /// [`Normal`]: crate::chain::chaininterface::ConfirmationTarget::Normal
+       pub fn close_channel(&self, channel_id: &[u8; 32]) -> Result<(), APIError> {
+               self.close_channel_internal(channel_id, None)
+       }
+
+       /// Begins the process of closing a channel. After this call (plus some timeout), no new HTLCs
+       /// will be accepted on the given channel, and after additional timeout/the closing of all
+       /// pending HTLCs, the channel will be closed on chain.
+       ///
+       /// `target_feerate_sat_per_1000_weight` has different meanings depending on if we initiated
+       /// the channel being closed or not:
+       ///  * If we are the channel initiator, we will pay at least this feerate on the closing
+       ///    transaction. The upper-bound is set by
+       ///    [`ChannelConfig::force_close_avoidance_max_fee_satoshis`] plus our [`Normal`] fee
+       ///    estimate (or `target_feerate_sat_per_1000_weight`, if it is greater).
+       ///  * If our counterparty is the channel initiator, we will refuse to accept a channel closure
+       ///    transaction feerate below `target_feerate_sat_per_1000_weight` (or the feerate which
+       ///    will appear on a force-closure transaction, whichever is lower).
+       ///
+       /// May generate a SendShutdown message event on success, which should be relayed.
+       ///
+       /// [`ChannelConfig::force_close_avoidance_max_fee_satoshis`]: crate::util::config::ChannelConfig::force_close_avoidance_max_fee_satoshis
+       /// [`Background`]: crate::chain::chaininterface::ConfirmationTarget::Background
+       /// [`Normal`]: crate::chain::chaininterface::ConfirmationTarget::Normal
+       pub fn close_channel_with_target_feerate(&self, channel_id: &[u8; 32], target_feerate_sats_per_1000_weight: u32) -> Result<(), APIError> {
+               self.close_channel_internal(channel_id, Some(target_feerate_sats_per_1000_weight))
+       }
+
        #[inline]
        fn finish_force_close_channel(&self, shutdown_res: ShutdownResult) {
                let (monitor_update_option, mut failed_htlcs) = shutdown_res;
@@ -1432,8 +1563,8 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
 
                let mut chacha = ChaCha20::new(&rho, &[0u8; 8]);
                let mut chacha_stream = ChaChaReader { chacha: &mut chacha, read: Cursor::new(&msg.onion_routing_packet.hop_data[..]) };
-               let (next_hop_data, next_hop_hmac) = {
-                       match msgs::OnionHopData::read(&mut chacha_stream) {
+               let (next_hop_data, next_hop_hmac): (msgs::OnionHopData, _) = {
+                       match <msgs::OnionHopData as Readable>::read(&mut chacha_stream) {
                                Err(err) => {
                                        let error_code = match err {
                                                msgs::DecodeError::UnknownVersion => 0x4000 | 1, // unknown realm byte
@@ -1727,7 +1858,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
        }
 
        // Only public for testing, this should otherwise never be called direcly
-       pub(crate) fn send_payment_along_path(&self, path: &Vec<RouteHop>, payment_hash: &PaymentHash, payment_secret: &Option<PaymentSecret>, total_value: u64, cur_height: u32, keysend_preimage: &Option<PaymentPreimage>) -> Result<(), APIError> {
+       pub(crate) fn send_payment_along_path(&self, path: &Vec<RouteHop>, payment_hash: &PaymentHash, payment_secret: &Option<PaymentSecret>, total_value: u64, cur_height: u32, mpp_id: MppId, keysend_preimage: &Option<PaymentPreimage>) -> Result<(), APIError> {
                log_trace!(self.logger, "Attempting to send payment for path with next hop {}", path.first().unwrap().short_channel_id);
                let prng_seed = self.keys_manager.get_secure_random_bytes();
                let session_priv_bytes = self.keys_manager.get_secure_random_bytes();
@@ -1742,7 +1873,9 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                let onion_packet = onion_utils::construct_onion_packet(onion_payloads, onion_keys, prng_seed, payment_hash);
 
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
-               assert!(self.pending_outbound_payments.lock().unwrap().insert(session_priv_bytes));
+               let mut pending_outbounds = self.pending_outbound_payments.lock().unwrap();
+               let sessions = pending_outbounds.entry(mpp_id).or_insert(HashSet::new());
+               assert!(sessions.insert(session_priv_bytes));
 
                let err: Result<(), _> = loop {
                        let mut channel_lock = self.channel_state.lock().unwrap();
@@ -1764,6 +1897,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                                path: path.clone(),
                                                session_priv: session_priv.clone(),
                                                first_hop_htlc_msat: htlc_msat,
+                                               mpp_id,
                                        }, onion_packet, &self.logger), channel_state, chan)
                                } {
                                        Some((update_add, commitment_signed, monitor_update)) => {
@@ -1857,9 +1991,13 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                        // for now more than 10 paths likely carries too much one-path failure.
                        return Err(PaymentSendFailure::ParameterError(APIError::RouteError{err: "Sending over more than 10 paths is not currently supported"}));
                }
+               if payment_secret.is_none() && route.paths.len() > 1 {
+                       return Err(PaymentSendFailure::ParameterError(APIError::APIMisuseError{err: "Payment secret is required for multi-path payments".to_string()}));
+               }
                let mut total_value = 0;
                let our_node_id = self.get_our_node_id();
                let mut path_errs = Vec::with_capacity(route.paths.len());
+               let mpp_id = MppId(self.keys_manager.get_secure_random_bytes());
                'path_check: for path in route.paths.iter() {
                        if path.len() < 1 || path.len() > 20 {
                                path_errs.push(Err(APIError::RouteError{err: "Path didn't go anywhere/had bogus size"}));
@@ -1881,7 +2019,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                let cur_height = self.best_block.read().unwrap().height() + 1;
                let mut results = Vec::new();
                for path in route.paths.iter() {
-                       results.push(self.send_payment_along_path(&path, &payment_hash, payment_secret, total_value, cur_height, &keysend_preimage));
+                       results.push(self.send_payment_along_path(&path, &payment_hash, payment_secret, total_value, cur_height, mpp_id, &keysend_preimage));
                }
                let mut has_ok = false;
                let mut has_err = false;
@@ -1911,9 +2049,13 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
        /// would be able to guess -- otherwise, an intermediate node may claim the payment and it will
        /// never reach the recipient.
        ///
+       /// See [`send_payment`] documentation for more details on the return value of this function.
+       ///
        /// Similar to regular payments, you MUST NOT reuse a `payment_preimage` value. See
        /// [`send_payment`] for more information about the risks of duplicate preimage usage.
        ///
+       /// Note that `route` must have exactly one path.
+       ///
        /// [`send_payment`]: Self::send_payment
        pub fn send_spontaneous_payment(&self, route: &Route, payment_preimage: Option<PaymentPreimage>) -> Result<PaymentHash, PaymentSendFailure> {
                let preimage = match payment_preimage {
@@ -2270,9 +2412,9 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                                                        // close channel and then send error message to peer.
                                                                        let counterparty_node_id = chan.get().get_counterparty_node_id();
                                                                        let err: Result<(), _>  = match e {
-                                                                               ChannelError::Ignore(_) => {
+                                                                               ChannelError::Ignore(_) | ChannelError::Warn(_) => {
                                                                                        panic!("Stated return value requirements in send_commitment() were not met");
-                                                                               },
+                                                                               }
                                                                                ChannelError::Close(msg) => {
                                                                                        log_trace!(self.logger, "Closing channel {} due to Close-required error: {}", log_bytes!(chan.key()[..]), msg);
                                                                                        let (channel_id, mut channel) = chan.remove_entry();
@@ -2508,48 +2650,160 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                self.process_background_events();
        }
 
-       /// If a peer is disconnected we mark any channels with that peer as 'disabled'.
-       /// After some time, if channels are still disabled we need to broadcast a ChannelUpdate
-       /// to inform the network about the uselessness of these channels.
+       fn update_channel_fee(&self, short_to_id: &mut HashMap<u64, [u8; 32]>, pending_msg_events: &mut Vec<events::MessageSendEvent>, chan_id: &[u8; 32], chan: &mut Channel<Signer>, new_feerate: u32) -> (bool, NotifyOption, Result<(), MsgHandleErrInternal>) {
+               if !chan.is_outbound() { return (true, NotifyOption::SkipPersist, Ok(())); }
+               // If the feerate has decreased by less than half, don't bother
+               if new_feerate <= chan.get_feerate() && new_feerate * 2 > chan.get_feerate() {
+                       log_trace!(self.logger, "Channel {} does not qualify for a feerate change from {} to {}.",
+                               log_bytes!(chan_id[..]), chan.get_feerate(), new_feerate);
+                       return (true, NotifyOption::SkipPersist, Ok(()));
+               }
+               if !chan.is_live() {
+                       log_trace!(self.logger, "Channel {} does not qualify for a feerate change from {} to {} as it cannot currently be updated (probably the peer is disconnected).",
+                               log_bytes!(chan_id[..]), chan.get_feerate(), new_feerate);
+                       return (true, NotifyOption::SkipPersist, Ok(()));
+               }
+               log_trace!(self.logger, "Channel {} qualifies for a feerate change from {} to {}.",
+                       log_bytes!(chan_id[..]), chan.get_feerate(), new_feerate);
+
+               let mut retain_channel = true;
+               let res = match chan.send_update_fee_and_commit(new_feerate, &self.logger) {
+                       Ok(res) => Ok(res),
+                       Err(e) => {
+                               let (drop, res) = convert_chan_err!(self, e, short_to_id, chan, chan_id);
+                               if drop { retain_channel = false; }
+                               Err(res)
+                       }
+               };
+               let ret_err = match res {
+                       Ok(Some((update_fee, commitment_signed, monitor_update))) => {
+                               if let Err(e) = self.chain_monitor.update_channel(chan.get_funding_txo().unwrap(), monitor_update) {
+                                       let (res, drop) = handle_monitor_err!(self, e, short_to_id, chan, RAACommitmentOrder::CommitmentFirst, false, true, Vec::new(), Vec::new(), chan_id);
+                                       if drop { retain_channel = false; }
+                                       res
+                               } else {
+                                       pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
+                                               node_id: chan.get_counterparty_node_id(),
+                                               updates: msgs::CommitmentUpdate {
+                                                       update_add_htlcs: Vec::new(),
+                                                       update_fulfill_htlcs: Vec::new(),
+                                                       update_fail_htlcs: Vec::new(),
+                                                       update_fail_malformed_htlcs: Vec::new(),
+                                                       update_fee: Some(update_fee),
+                                                       commitment_signed,
+                                               },
+                                       });
+                                       Ok(())
+                               }
+                       },
+                       Ok(None) => Ok(()),
+                       Err(e) => Err(e),
+               };
+               (retain_channel, NotifyOption::DoPersist, ret_err)
+       }
+
+       #[cfg(fuzzing)]
+       /// In chanmon_consistency we want to sometimes do the channel fee updates done in
+       /// timer_tick_occurred, but we can't generate the disabled channel updates as it considers
+       /// these a fuzz failure (as they usually indicate a channel force-close, which is exactly what
+       /// it wants to detect). Thus, we have a variant exposed here for its benefit.
+       pub fn maybe_update_chan_fees(&self) {
+               PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || {
+                       let mut should_persist = NotifyOption::SkipPersist;
+
+                       let new_feerate = self.fee_estimator.get_est_sat_per_1000_weight(ConfirmationTarget::Normal);
+
+                       let mut handle_errors = Vec::new();
+                       {
+                               let mut channel_state_lock = self.channel_state.lock().unwrap();
+                               let channel_state = &mut *channel_state_lock;
+                               let pending_msg_events = &mut channel_state.pending_msg_events;
+                               let short_to_id = &mut channel_state.short_to_id;
+                               channel_state.by_id.retain(|chan_id, chan| {
+                                       let (retain_channel, chan_needs_persist, err) = self.update_channel_fee(short_to_id, pending_msg_events, chan_id, chan, new_feerate);
+                                       if chan_needs_persist == NotifyOption::DoPersist { should_persist = NotifyOption::DoPersist; }
+                                       if err.is_err() {
+                                               handle_errors.push(err);
+                                       }
+                                       retain_channel
+                               });
+                       }
+
+                       should_persist
+               });
+       }
+
+       /// Performs actions which should happen on startup and roughly once per minute thereafter.
        ///
-       /// This method handles all the details, and must be called roughly once per minute.
+       /// This currently includes:
+       ///  * Increasing or decreasing the on-chain feerate estimates for our outbound channels,
+       ///  * Broadcasting `ChannelUpdate` messages if we've been disconnected from our peer for more
+       ///    than a minute, informing the network that they should no longer attempt to route over
+       ///    the channel.
        ///
-       /// Note that in some rare cases this may generate a `chain::Watch::update_channel` call.
+       /// Note that this may cause reentrancy through `chain::Watch::update_channel` calls or feerate
+       /// estimate fetches.
        pub fn timer_tick_occurred(&self) {
                PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || {
                        let mut should_persist = NotifyOption::SkipPersist;
                        if self.process_background_events() { should_persist = NotifyOption::DoPersist; }
 
-                       let mut channel_state_lock = self.channel_state.lock().unwrap();
-                       let channel_state = &mut *channel_state_lock;
-                       for (_, chan) in channel_state.by_id.iter_mut() {
-                               match chan.channel_update_status() {
-                                       ChannelUpdateStatus::Enabled if !chan.is_live() => chan.set_channel_update_status(ChannelUpdateStatus::DisabledStaged),
-                                       ChannelUpdateStatus::Disabled if chan.is_live() => chan.set_channel_update_status(ChannelUpdateStatus::EnabledStaged),
-                                       ChannelUpdateStatus::DisabledStaged if chan.is_live() => chan.set_channel_update_status(ChannelUpdateStatus::Enabled),
-                                       ChannelUpdateStatus::EnabledStaged if !chan.is_live() => chan.set_channel_update_status(ChannelUpdateStatus::Disabled),
-                                       ChannelUpdateStatus::DisabledStaged if !chan.is_live() => {
-                                               if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
-                                                       channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
-                                                               msg: update
-                                                       });
-                                               }
-                                               should_persist = NotifyOption::DoPersist;
-                                               chan.set_channel_update_status(ChannelUpdateStatus::Disabled);
-                                       },
-                                       ChannelUpdateStatus::EnabledStaged if chan.is_live() => {
-                                               if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
-                                                       channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
-                                                               msg: update
-                                                       });
-                                               }
-                                               should_persist = NotifyOption::DoPersist;
-                                               chan.set_channel_update_status(ChannelUpdateStatus::Enabled);
-                                       },
-                                       _ => {},
-                               }
+                       let new_feerate = self.fee_estimator.get_est_sat_per_1000_weight(ConfirmationTarget::Normal);
+
+                       let mut handle_errors = Vec::new();
+                       {
+                               let mut channel_state_lock = self.channel_state.lock().unwrap();
+                               let channel_state = &mut *channel_state_lock;
+                               let pending_msg_events = &mut channel_state.pending_msg_events;
+                               let short_to_id = &mut channel_state.short_to_id;
+                               channel_state.by_id.retain(|chan_id, chan| {
+                                       let counterparty_node_id = chan.get_counterparty_node_id();
+                                       let (retain_channel, chan_needs_persist, err) = self.update_channel_fee(short_to_id, pending_msg_events, chan_id, chan, new_feerate);
+                                       if chan_needs_persist == NotifyOption::DoPersist { should_persist = NotifyOption::DoPersist; }
+                                       if err.is_err() {
+                                               handle_errors.push((err, counterparty_node_id));
+                                       }
+                                       if !retain_channel { return false; }
+
+                                       if let Err(e) = chan.timer_check_closing_negotiation_progress() {
+                                               let (needs_close, err) = convert_chan_err!(self, e, short_to_id, chan, chan_id);
+                                               handle_errors.push((Err(err), chan.get_counterparty_node_id()));
+                                               if needs_close { return false; }
+                                       }
+
+                                       match chan.channel_update_status() {
+                                               ChannelUpdateStatus::Enabled if !chan.is_live() => chan.set_channel_update_status(ChannelUpdateStatus::DisabledStaged),
+                                               ChannelUpdateStatus::Disabled if chan.is_live() => chan.set_channel_update_status(ChannelUpdateStatus::EnabledStaged),
+                                               ChannelUpdateStatus::DisabledStaged if chan.is_live() => chan.set_channel_update_status(ChannelUpdateStatus::Enabled),
+                                               ChannelUpdateStatus::EnabledStaged if !chan.is_live() => chan.set_channel_update_status(ChannelUpdateStatus::Disabled),
+                                               ChannelUpdateStatus::DisabledStaged if !chan.is_live() => {
+                                                       if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
+                                                               pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
+                                                                       msg: update
+                                                               });
+                                                       }
+                                                       should_persist = NotifyOption::DoPersist;
+                                                       chan.set_channel_update_status(ChannelUpdateStatus::Disabled);
+                                               },
+                                               ChannelUpdateStatus::EnabledStaged if chan.is_live() => {
+                                                       if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
+                                                               pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
+                                                                       msg: update
+                                                               });
+                                                       }
+                                                       should_persist = NotifyOption::DoPersist;
+                                                       chan.set_channel_update_status(ChannelUpdateStatus::Enabled);
+                                               },
+                                               _ => {},
+                                       }
+
+                                       true
+                               });
                        }
 
+                       for (err, counterparty_node_id) in handle_errors.drain(..) {
+                               let _ = handle_error!(self, err, counterparty_node_id);
+                       }
                        should_persist
                });
        }
@@ -2600,22 +2854,28 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                        self.fail_htlc_backwards_internal(channel_state,
                                                htlc_src, &payment_hash, HTLCFailReason::Reason { failure_code, data: onion_failure_data});
                                },
-                               HTLCSource::OutboundRoute { session_priv, .. } => {
-                                       if {
-                                               let mut session_priv_bytes = [0; 32];
-                                               session_priv_bytes.copy_from_slice(&session_priv[..]);
-                                               self.pending_outbound_payments.lock().unwrap().remove(&session_priv_bytes)
-                                       } {
-                                               self.pending_events.lock().unwrap().push(
-                                                       events::Event::PaymentFailed {
-                                                               payment_hash,
-                                                               rejected_by_dest: false,
-#[cfg(test)]
-                                                               error_code: None,
-#[cfg(test)]
-                                                               error_data: None,
+                               HTLCSource::OutboundRoute { session_priv, mpp_id, .. } => {
+                                       let mut session_priv_bytes = [0; 32];
+                                       session_priv_bytes.copy_from_slice(&session_priv[..]);
+                                       let mut outbounds = self.pending_outbound_payments.lock().unwrap();
+                                       if let hash_map::Entry::Occupied(mut sessions) = outbounds.entry(mpp_id) {
+                                               if sessions.get_mut().remove(&session_priv_bytes) {
+                                                       self.pending_events.lock().unwrap().push(
+                                                               events::Event::PaymentFailed {
+                                                                       payment_hash,
+                                                                       rejected_by_dest: false,
+                                                                       network_update: None,
+                                                                       all_paths_failed: sessions.get().len() == 0,
+                                                                       #[cfg(test)]
+                                                                       error_code: None,
+                                                                       #[cfg(test)]
+                                                                       error_data: None,
+                                                               }
+                                                       );
+                                                       if sessions.get().len() == 0 {
+                                                               sessions.remove();
                                                        }
-                                               )
+                                               }
                                        } else {
                                                log_trace!(self.logger, "Received duplicative fail for HTLC with payment_hash {}", log_bytes!(payment_hash.0));
                                        }
@@ -2640,12 +2900,21 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                // from block_connected which may run during initialization prior to the chain_monitor
                // being fully configured. See the docs for `ChannelManagerReadArgs` for more.
                match source {
-                       HTLCSource::OutboundRoute { ref path, session_priv, .. } => {
-                               if {
-                                       let mut session_priv_bytes = [0; 32];
-                                       session_priv_bytes.copy_from_slice(&session_priv[..]);
-                                       !self.pending_outbound_payments.lock().unwrap().remove(&session_priv_bytes)
-                               } {
+                       HTLCSource::OutboundRoute { ref path, session_priv, mpp_id, .. } => {
+                               let mut session_priv_bytes = [0; 32];
+                               session_priv_bytes.copy_from_slice(&session_priv[..]);
+                               let mut outbounds = self.pending_outbound_payments.lock().unwrap();
+                               let mut all_paths_failed = false;
+                               if let hash_map::Entry::Occupied(mut sessions) = outbounds.entry(mpp_id) {
+                                       if !sessions.get_mut().remove(&session_priv_bytes) {
+                                               log_trace!(self.logger, "Received duplicative fail for HTLC with payment_hash {}", log_bytes!(payment_hash.0));
+                                               return;
+                                       }
+                                       if sessions.get().len() == 0 {
+                                               all_paths_failed = true;
+                                               sessions.remove();
+                                       }
+                               } else {
                                        log_trace!(self.logger, "Received duplicative fail for HTLC with payment_hash {}", log_bytes!(payment_hash.0));
                                        return;
                                }
@@ -2654,23 +2923,18 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                match &onion_error {
                                        &HTLCFailReason::LightningError { ref err } => {
 #[cfg(test)]
-                                               let (channel_update, payment_retryable, onion_error_code, onion_error_data) = onion_utils::process_onion_failure(&self.secp_ctx, &self.logger, &source, err.data.clone());
+                                               let (network_update, payment_retryable, onion_error_code, onion_error_data) = onion_utils::process_onion_failure(&self.secp_ctx, &self.logger, &source, err.data.clone());
 #[cfg(not(test))]
-                                               let (channel_update, payment_retryable, _, _) = onion_utils::process_onion_failure(&self.secp_ctx, &self.logger, &source, err.data.clone());
+                                               let (network_update, payment_retryable, _, _) = onion_utils::process_onion_failure(&self.secp_ctx, &self.logger, &source, err.data.clone());
                                                // TODO: If we decided to blame ourselves (or one of our channels) in
                                                // process_onion_failure we should close that channel as it implies our
                                                // next-hop is needlessly blaming us!
-                                               if let Some(update) = channel_update {
-                                                       self.channel_state.lock().unwrap().pending_msg_events.push(
-                                                               events::MessageSendEvent::PaymentFailureNetworkUpdate {
-                                                                       update,
-                                                               }
-                                                       );
-                                               }
                                                self.pending_events.lock().unwrap().push(
                                                        events::Event::PaymentFailed {
                                                                payment_hash: payment_hash.clone(),
                                                                rejected_by_dest: !payment_retryable,
+                                                               network_update,
+                                                               all_paths_failed,
 #[cfg(test)]
                                                                error_code: onion_error_code,
 #[cfg(test)]
@@ -2685,7 +2949,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                                        ref data,
                                                        .. } => {
                                                // we get a fail_malformed_htlc from the first hop
-                                               // TODO: We'd like to generate a PaymentFailureNetworkUpdate for temporary
+                                               // TODO: We'd like to generate a NetworkUpdate for temporary
                                                // failures here, but that would be insufficient as get_route
                                                // generally ignores its view of our own channels as we provide them via
                                                // ChannelDetails.
@@ -2695,6 +2959,8 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                                        events::Event::PaymentFailed {
                                                                payment_hash: payment_hash.clone(),
                                                                rejected_by_dest: path.len() == 1,
+                                                               network_update: None,
+                                                               all_paths_failed,
 #[cfg(test)]
                                                                error_code: Some(*failure_code),
 #[cfg(test)]
@@ -2804,7 +3070,13 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                                        } else { errs.push((pk, err)); }
                                                },
                                                ClaimFundsFromHop::PrevHopForceClosed => unreachable!("We already checked for channel existence, we can't fail here!"),
-                                               _ => claimed_any_htlcs = true,
+                                               ClaimFundsFromHop::DuplicateClaim => {
+                                                       // While we should never get here in most cases, if we do, it likely
+                                                       // indicates that the HTLC was timed out some time ago and is no longer
+                                                       // available to be claimed. Thus, it does not make sense to set
+                                                       // `claimed_any_htlcs`.
+                                               },
+                                               ClaimFundsFromHop::Success(_) => claimed_any_htlcs = true,
                                        }
                                }
                        }
@@ -2885,17 +3157,18 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
 
        fn claim_funds_internal(&self, mut channel_state_lock: MutexGuard<ChannelHolder<Signer>>, source: HTLCSource, payment_preimage: PaymentPreimage, forwarded_htlc_value_msat: Option<u64>, from_onchain: bool) {
                match source {
-                       HTLCSource::OutboundRoute { session_priv, .. } => {
+                       HTLCSource::OutboundRoute { session_priv, mpp_id, .. } => {
                                mem::drop(channel_state_lock);
-                               if {
-                                       let mut session_priv_bytes = [0; 32];
-                                       session_priv_bytes.copy_from_slice(&session_priv[..]);
-                                       self.pending_outbound_payments.lock().unwrap().remove(&session_priv_bytes)
-                               } {
-                                       let mut pending_events = self.pending_events.lock().unwrap();
-                                       pending_events.push(events::Event::PaymentSent {
-                                               payment_preimage
-                                       });
+                               let mut session_priv_bytes = [0; 32];
+                               session_priv_bytes.copy_from_slice(&session_priv[..]);
+                               let mut outbounds = self.pending_outbound_payments.lock().unwrap();
+                               let found_payment = if let Some(mut sessions) = outbounds.remove(&mpp_id) {
+                                       sessions.remove(&session_priv_bytes)
+                               } else { false };
+                               if found_payment {
+                                       self.pending_events.lock().unwrap().push(
+                                               events::Event::PaymentSent { payment_preimage }
+                                       );
                                } else {
                                        log_trace!(self.logger, "Received duplicative fulfill for HTLC with payment_preimage {}", log_bytes!(payment_preimage.0));
                                }
@@ -3020,7 +3293,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                        return Err(MsgHandleErrInternal::send_err_msg_no_close("Unknown genesis block hash".to_owned(), msg.temporary_channel_id.clone()));
                }
 
-               let channel = Channel::new_from_req(&self.fee_estimator, &self.keys_manager, counterparty_node_id.clone(), their_features, msg, 0, &self.default_configuration)
+               let channel = Channel::new_from_req(&self.fee_estimator, &self.keys_manager, counterparty_node_id.clone(), &their_features, msg, 0, &self.default_configuration)
                        .map_err(|e| MsgHandleErrInternal::from_chan_no_close(e, msg.temporary_channel_id))?;
                let mut channel_state_lock = self.channel_state.lock().unwrap();
                let channel_state = &mut *channel_state_lock;
@@ -3046,7 +3319,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                        if chan.get().get_counterparty_node_id() != *counterparty_node_id {
                                                return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.temporary_channel_id));
                                        }
-                                       try_chan_entry!(self, chan.get_mut().accept_channel(&msg, &self.default_configuration, their_features), channel_state, chan);
+                                       try_chan_entry!(self, chan.get_mut().accept_channel(&msg, &self.default_configuration, &their_features), channel_state, chan);
                                        (chan.get().get_value_satoshis(), chan.get().get_funding_redeemscript().to_v0_p2wsh(), chan.get().get_user_id())
                                },
                                hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.temporary_channel_id))
@@ -3183,7 +3456,8 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
        }
 
        fn internal_shutdown(&self, counterparty_node_id: &PublicKey, their_features: &InitFeatures, msg: &msgs::Shutdown) -> Result<(), MsgHandleErrInternal> {
-               let (mut dropped_htlcs, chan_option) = {
+               let mut dropped_htlcs: Vec<(HTLCSource, PaymentHash)>;
+               let result: Result<(), _> = loop {
                        let mut channel_state_lock = self.channel_state.lock().unwrap();
                        let channel_state = &mut *channel_state_lock;
 
@@ -3192,25 +3466,36 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                        if chan_entry.get().get_counterparty_node_id() != *counterparty_node_id {
                                                return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id));
                                        }
-                                       let (shutdown, closing_signed, dropped_htlcs) = try_chan_entry!(self, chan_entry.get_mut().shutdown(&self.fee_estimator, &their_features, &msg), channel_state, chan_entry);
+
+                                       if !chan_entry.get().received_shutdown() {
+                                               log_info!(self.logger, "Received a shutdown message from our counterparty for channel {}{}.",
+                                                       log_bytes!(msg.channel_id),
+                                                       if chan_entry.get().sent_shutdown() { " after we initiated shutdown" } else { "" });
+                                       }
+
+                                       let (shutdown, monitor_update, htlcs) = try_chan_entry!(self, chan_entry.get_mut().shutdown(&self.keys_manager, &their_features, &msg), channel_state, chan_entry);
+                                       dropped_htlcs = htlcs;
+
+                                       // Update the monitor with the shutdown script if necessary.
+                                       if let Some(monitor_update) = monitor_update {
+                                               if let Err(e) = self.chain_monitor.update_channel(chan_entry.get().get_funding_txo().unwrap(), monitor_update) {
+                                                       let (result, is_permanent) =
+                                                               handle_monitor_err!(self, e, channel_state.short_to_id, chan_entry.get_mut(), RAACommitmentOrder::CommitmentFirst, false, false, Vec::new(), Vec::new(), chan_entry.key());
+                                                       if is_permanent {
+                                                               remove_channel!(channel_state, chan_entry);
+                                                               break result;
+                                                       }
+                                               }
+                                       }
+
                                        if let Some(msg) = shutdown {
                                                channel_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown {
-                                                       node_id: counterparty_node_id.clone(),
-                                                       msg,
-                                               });
-                                       }
-                                       if let Some(msg) = closing_signed {
-                                               channel_state.pending_msg_events.push(events::MessageSendEvent::SendClosingSigned {
-                                                       node_id: counterparty_node_id.clone(),
+                                                       node_id: *counterparty_node_id,
                                                        msg,
                                                });
                                        }
-                                       if chan_entry.get().is_shutdown() {
-                                               if let Some(short_id) = chan_entry.get().get_short_channel_id() {
-                                                       channel_state.short_to_id.remove(&short_id);
-                                               }
-                                               (dropped_htlcs, Some(chan_entry.remove_entry().1))
-                                       } else { (dropped_htlcs, None) }
+
+                                       break Ok(());
                                },
                                hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id))
                        }
@@ -3218,14 +3503,8 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                for htlc_source in dropped_htlcs.drain(..) {
                        self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_source.0, &htlc_source.1, HTLCFailReason::Reason { failure_code: 0x4000 | 8, data: Vec::new() });
                }
-               if let Some(chan) = chan_option {
-                       if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
-                               let mut channel_state = self.channel_state.lock().unwrap();
-                               channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
-                                       msg: update
-                               });
-                       }
-               }
+
+               let _ = handle_error!(self, result, *counterparty_node_id);
                Ok(())
        }
 
@@ -3295,33 +3574,34 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                }
 
                                let create_pending_htlc_status = |chan: &Channel<Signer>, pending_forward_info: PendingHTLCStatus, error_code: u16| {
-                                       // Ensure error_code has the UPDATE flag set, since by default we send a
-                                       // channel update along as part of failing the HTLC.
-                                       assert!((error_code & 0x1000) != 0);
                                        // If the update_add is completely bogus, the call will Err and we will close,
                                        // but if we've sent a shutdown and they haven't acknowledged it yet, we just
                                        // want to reject the new HTLC and fail it backwards instead of forwarding.
                                        match pending_forward_info {
                                                PendingHTLCStatus::Forward(PendingHTLCInfo { ref incoming_shared_secret, .. }) => {
-                                                       let reason = if let Ok(upd) = self.get_channel_update_for_unicast(chan) {
-                                                               onion_utils::build_first_hop_failure_packet(incoming_shared_secret, error_code, &{
-                                                                       let mut res = Vec::with_capacity(8 + 128);
-                                                                       // TODO: underspecified, follow https://github.com/lightningnetwork/lightning-rfc/issues/791
-                                                                       res.extend_from_slice(&byte_utils::be16_to_array(0));
-                                                                       res.extend_from_slice(&upd.encode_with_len()[..]);
-                                                                       res
-                                                               }[..])
+                                                       let reason = if (error_code & 0x1000) != 0 {
+                                                               if let Ok(upd) = self.get_channel_update_for_unicast(chan) {
+                                                                       onion_utils::build_first_hop_failure_packet(incoming_shared_secret, error_code, &{
+                                                                               let mut res = Vec::with_capacity(8 + 128);
+                                                                               // TODO: underspecified, follow https://github.com/lightningnetwork/lightning-rfc/issues/791
+                                                                               res.extend_from_slice(&byte_utils::be16_to_array(0));
+                                                                               res.extend_from_slice(&upd.encode_with_len()[..]);
+                                                                               res
+                                                                       }[..])
+                                                               } else {
+                                                                       // The only case where we'd be unable to
+                                                                       // successfully get a channel update is if the
+                                                                       // channel isn't in the fully-funded state yet,
+                                                                       // implying our counterparty is trying to route
+                                                                       // payments over the channel back to themselves
+                                                                       // (because no one else should know the short_id
+                                                                       // is a lightning channel yet). We should have
+                                                                       // no problem just calling this
+                                                                       // unknown_next_peer (0x4000|10).
+                                                                       onion_utils::build_first_hop_failure_packet(incoming_shared_secret, 0x4000|10, &[])
+                                                               }
                                                        } else {
-                                                               // The only case where we'd be unable to
-                                                               // successfully get a channel update is if the
-                                                               // channel isn't in the fully-funded state yet,
-                                                               // implying our counterparty is trying to route
-                                                               // payments over the channel back to themselves
-                                                               // (cause no one else should know the short_id
-                                                               // is a lightning channel yet). We should have
-                                                               // no problem just calling this
-                                                               // unknown_next_peer (0x4000|10).
-                                                               onion_utils::build_first_hop_failure_packet(incoming_shared_secret, 0x4000|10, &[])
+                                                               onion_utils::build_first_hop_failure_packet(incoming_shared_secret, error_code, &[])
                                                        };
                                                        let msg = msgs::UpdateFailHTLC {
                                                                channel_id: msg.channel_id,
@@ -3400,8 +3680,8 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                if chan.get().get_counterparty_node_id() != *counterparty_node_id {
                                        return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id));
                                }
-                               let (revoke_and_ack, commitment_signed, closing_signed, monitor_update) =
-                                       match chan.get_mut().commitment_signed(&msg, &self.fee_estimator, &self.logger) {
+                               let (revoke_and_ack, commitment_signed, monitor_update) =
+                                       match chan.get_mut().commitment_signed(&msg, &self.logger) {
                                                Err((None, e)) => try_chan_entry!(self, Err(e), channel_state, chan),
                                                Err((Some(update), e)) => {
                                                        assert!(chan.get().is_awaiting_monitor_update());
@@ -3413,7 +3693,6 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                        };
                                if let Err(e) = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), monitor_update) {
                                        return_monitor_err!(self, e, channel_state, chan, RAACommitmentOrder::RevokeAndACKFirst, true, commitment_signed.is_some());
-                                       //TODO: Rebroadcast closing_signed if present on monitor update restoration
                                }
                                channel_state.pending_msg_events.push(events::MessageSendEvent::SendRevokeAndACK {
                                        node_id: counterparty_node_id.clone(),
@@ -3432,12 +3711,6 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                                },
                                        });
                                }
-                               if let Some(msg) = closing_signed {
-                                       channel_state.pending_msg_events.push(events::MessageSendEvent::SendClosingSigned {
-                                               node_id: counterparty_node_id.clone(),
-                                               msg,
-                                       });
-                               }
                                Ok(())
                        },
                        hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id))
@@ -3493,12 +3766,12 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                                break Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id));
                                        }
                                        let was_frozen_for_monitor = chan.get().is_awaiting_monitor_update();
-                                       let (commitment_update, pending_forwards, pending_failures, closing_signed, monitor_update, htlcs_to_fail_in) =
-                                               break_chan_entry!(self, chan.get_mut().revoke_and_ack(&msg, &self.fee_estimator, &self.logger), channel_state, chan);
+                                       let (commitment_update, pending_forwards, pending_failures, monitor_update, htlcs_to_fail_in) =
+                                               break_chan_entry!(self, chan.get_mut().revoke_and_ack(&msg, &self.logger), channel_state, chan);
                                        htlcs_to_fail = htlcs_to_fail_in;
                                        if let Err(e) = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), monitor_update) {
                                                if was_frozen_for_monitor {
-                                                       assert!(commitment_update.is_none() && closing_signed.is_none() && pending_forwards.is_empty() && pending_failures.is_empty());
+                                                       assert!(commitment_update.is_none() && pending_forwards.is_empty() && pending_failures.is_empty());
                                                        break Err(MsgHandleErrInternal::ignore_no_close("Previous monitor update failure prevented responses to RAA".to_owned()));
                                                } else {
                                                        if let Err(e) = handle_monitor_err!(self, e, channel_state, chan, RAACommitmentOrder::CommitmentFirst, false, commitment_update.is_some(), pending_forwards, pending_failures) {
@@ -3512,12 +3785,6 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                                        updates,
                                                });
                                        }
-                                       if let Some(msg) = closing_signed {
-                                               channel_state.pending_msg_events.push(events::MessageSendEvent::SendClosingSigned {
-                                                       node_id: counterparty_node_id.clone(),
-                                                       msg,
-                                               });
-                                       }
                                        break Ok((pending_forwards, pending_failures, chan.get().get_short_channel_id().expect("RAA should only work on a short-id-available channel"), chan.get().get_funding_txo().unwrap()))
                                },
                                hash_map::Entry::Vacant(_) => break Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id))
@@ -3662,62 +3929,6 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                Ok(())
        }
 
-       /// Begin Update fee process. Allowed only on an outbound channel.
-       /// If successful, will generate a UpdateHTLCs event, so you should probably poll
-       /// PeerManager::process_events afterwards.
-       /// Note: This API is likely to change!
-       /// (C-not exported) Cause its doc(hidden) anyway
-       #[doc(hidden)]
-       pub fn update_fee(&self, channel_id: [u8;32], feerate_per_kw: u32) -> Result<(), APIError> {
-               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
-               let counterparty_node_id;
-               let err: Result<(), _> = loop {
-                       let mut channel_state_lock = self.channel_state.lock().unwrap();
-                       let channel_state = &mut *channel_state_lock;
-
-                       match channel_state.by_id.entry(channel_id) {
-                               hash_map::Entry::Vacant(_) => return Err(APIError::APIMisuseError{err: format!("Failed to find corresponding channel for id {}", channel_id.to_hex())}),
-                               hash_map::Entry::Occupied(mut chan) => {
-                                       if !chan.get().is_outbound() {
-                                               return Err(APIError::APIMisuseError{err: "update_fee cannot be sent for an inbound channel".to_owned()});
-                                       }
-                                       if chan.get().is_awaiting_monitor_update() {
-                                               return Err(APIError::MonitorUpdateFailed);
-                                       }
-                                       if !chan.get().is_live() {
-                                               return Err(APIError::ChannelUnavailable{err: "Channel is either not yet fully established or peer is currently disconnected".to_owned()});
-                                       }
-                                       counterparty_node_id = chan.get().get_counterparty_node_id();
-                                       if let Some((update_fee, commitment_signed, monitor_update)) =
-                                                       break_chan_entry!(self, chan.get_mut().send_update_fee_and_commit(feerate_per_kw, &self.logger), channel_state, chan)
-                                       {
-                                               if let Err(_e) = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), monitor_update) {
-                                                       unimplemented!();
-                                               }
-                                               log_debug!(self.logger, "Updating fee resulted in a commitment_signed for channel {}", log_bytes!(chan.get().channel_id()));
-                                               channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
-                                                       node_id: chan.get().get_counterparty_node_id(),
-                                                       updates: msgs::CommitmentUpdate {
-                                                               update_add_htlcs: Vec::new(),
-                                                               update_fulfill_htlcs: Vec::new(),
-                                                               update_fail_htlcs: Vec::new(),
-                                                               update_fail_malformed_htlcs: Vec::new(),
-                                                               update_fee: Some(update_fee),
-                                                               commitment_signed,
-                                                       },
-                                               });
-                                       }
-                               },
-                       }
-                       return Ok(())
-               };
-
-               match handle_error!(self, err, counterparty_node_id) {
-                       Ok(_) => unreachable!(),
-                       Err(e) => { Err(APIError::APIMisuseError { err: e.err })}
-               }
-       }
-
        /// Process pending events from the `chain::Watch`, returning whether any events were processed.
        fn process_pending_monitor_events(&self) -> bool {
                let mut failed_channels = Vec::new();
@@ -3817,7 +4028,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                        });
                }
 
-               let has_update = has_monitor_update || !failed_htlcs.is_empty();
+               let has_update = has_monitor_update || !failed_htlcs.is_empty() || !handle_errors.is_empty();
                for (failures, channel_id) in failed_htlcs.drain(..) {
                        self.fail_holding_cell_htlcs(failures, channel_id);
                }
@@ -3829,6 +4040,63 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                has_update
        }
 
+       /// Check whether any channels have finished removing all pending updates after a shutdown
+       /// exchange and can now send a closing_signed.
+       /// Returns whether any closing_signed messages were generated.
+       fn maybe_generate_initial_closing_signed(&self) -> bool {
+               let mut handle_errors: Vec<(PublicKey, Result<(), _>)> = Vec::new();
+               let mut has_update = false;
+               {
+                       let mut channel_state_lock = self.channel_state.lock().unwrap();
+                       let channel_state = &mut *channel_state_lock;
+                       let by_id = &mut channel_state.by_id;
+                       let short_to_id = &mut channel_state.short_to_id;
+                       let pending_msg_events = &mut channel_state.pending_msg_events;
+
+                       by_id.retain(|channel_id, chan| {
+                               match chan.maybe_propose_closing_signed(&self.fee_estimator, &self.logger) {
+                                       Ok((msg_opt, tx_opt)) => {
+                                               if let Some(msg) = msg_opt {
+                                                       has_update = true;
+                                                       pending_msg_events.push(events::MessageSendEvent::SendClosingSigned {
+                                                               node_id: chan.get_counterparty_node_id(), msg,
+                                                       });
+                                               }
+                                               if let Some(tx) = tx_opt {
+                                                       // We're done with this channel. We got a closing_signed and sent back
+                                                       // a closing_signed with a closing transaction to broadcast.
+                                                       if let Some(short_id) = chan.get_short_channel_id() {
+                                                               short_to_id.remove(&short_id);
+                                                       }
+
+                                                       if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
+                                                               pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
+                                                                       msg: update
+                                                               });
+                                                       }
+
+                                                       log_info!(self.logger, "Broadcasting {}", log_tx!(tx));
+                                                       self.tx_broadcaster.broadcast_transaction(&tx);
+                                                       false
+                                               } else { true }
+                                       },
+                                       Err(e) => {
+                                               has_update = true;
+                                               let (close_channel, res) = convert_chan_err!(self, e, short_to_id, chan, channel_id);
+                                               handle_errors.push((chan.get_counterparty_node_id(), Err(res)));
+                                               !close_channel
+                                       }
+                               }
+                       });
+               }
+
+               for (counterparty_node_id, err) in handle_errors.drain(..) {
+                       let _ = handle_error!(self, err, counterparty_node_id);
+               }
+
+               has_update
+       }
+
        /// Handle a list of channel failures during a block_connected or block_disconnected call,
        /// pushing the channel monitor update (if any) to the background events queue and removing the
        /// Channel object.
@@ -3955,7 +4223,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
        #[cfg(any(test, feature = "fuzztarget", feature = "_test_utils"))]
        pub fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
                let events = core::cell::RefCell::new(Vec::new());
-               let event_handler = |event| events.borrow_mut().push(event);
+               let event_handler = |event: &events::Event| events.borrow_mut().push(event.clone());
                self.process_pending_events(&event_handler);
                events.into_inner()
        }
@@ -3982,6 +4250,9 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> MessageSend
                        if self.check_free_holding_cells() {
                                result = NotifyOption::DoPersist;
                        }
+                       if self.maybe_generate_initial_closing_signed() {
+                               result = NotifyOption::DoPersist;
+                       }
 
                        let mut pending_events = Vec::new();
                        let mut channel_state = self.channel_state.lock().unwrap();
@@ -4023,13 +4294,13 @@ where
                                result = NotifyOption::DoPersist;
                        }
 
-                       let mut pending_events = std::mem::replace(&mut *self.pending_events.lock().unwrap(), vec![]);
+                       let mut pending_events = mem::replace(&mut *self.pending_events.lock().unwrap(), vec![]);
                        if !pending_events.is_empty() {
                                result = NotifyOption::DoPersist;
                        }
 
                        for event in pending_events.drain(..) {
-                               handler.handle_event(event);
+                               handler.handle_event(&event);
                        }
 
                        result
@@ -4454,7 +4725,6 @@ impl<Signer: Sign, M: Deref , T: Deref , K: Deref , F: Deref , L: Deref >
                                        &events::MessageSendEvent::BroadcastChannelUpdate { .. } => true,
                                        &events::MessageSendEvent::SendChannelUpdate { ref node_id, .. } => node_id != counterparty_node_id,
                                        &events::MessageSendEvent::HandleError { ref node_id, .. } => node_id != counterparty_node_id,
-                                       &events::MessageSendEvent::PaymentFailureNetworkUpdate { .. } => true,
                                        &events::MessageSendEvent::SendChannelRangeQuery { .. } => false,
                                        &events::MessageSendEvent::SendShortIdsQuery { .. } => false,
                                        &events::MessageSendEvent::SendReplyChannelRange { .. } => false,
@@ -4643,7 +4913,7 @@ impl_writeable_tlv_based!(HTLCPreviousHopData, {
 });
 
 impl Writeable for ClaimableHTLC {
-       fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ::std::io::Error> {
+       fn write<W: Writer>(&self, writer: &mut W) -> Result<(), io::Error> {
                let payment_data = match &self.onion_payload {
                        OnionPayload::Invoice(data) => Some(data.clone()),
                        _ => None,
@@ -4700,14 +4970,60 @@ impl Readable for ClaimableHTLC {
        }
 }
 
-impl_writeable_tlv_based_enum!(HTLCSource,
-       (0, OutboundRoute) => {
-               (0, session_priv, required),
-               (2, first_hop_htlc_msat, required),
-               (4, path, vec_type),
-       }, ;
-       (1, PreviousHopData)
-);
+impl Readable for HTLCSource {
+       fn read<R: Read>(reader: &mut R) -> Result<Self, DecodeError> {
+               let id: u8 = Readable::read(reader)?;
+               match id {
+                       0 => {
+                               let mut session_priv: ::util::ser::OptionDeserWrapper<SecretKey> = ::util::ser::OptionDeserWrapper(None);
+                               let mut first_hop_htlc_msat: u64 = 0;
+                               let mut path = Some(Vec::new());
+                               let mut mpp_id = None;
+                               read_tlv_fields!(reader, {
+                                       (0, session_priv, required),
+                                       (1, mpp_id, option),
+                                       (2, first_hop_htlc_msat, required),
+                                       (4, path, vec_type),
+                               });
+                               if mpp_id.is_none() {
+                                       // For backwards compat, if there was no mpp_id written, use the session_priv bytes
+                                       // instead.
+                                       mpp_id = Some(MppId(*session_priv.0.unwrap().as_ref()));
+                               }
+                               Ok(HTLCSource::OutboundRoute {
+                                       session_priv: session_priv.0.unwrap(),
+                                       first_hop_htlc_msat: first_hop_htlc_msat,
+                                       path: path.unwrap(),
+                                       mpp_id: mpp_id.unwrap(),
+                               })
+                       }
+                       1 => Ok(HTLCSource::PreviousHopData(Readable::read(reader)?)),
+                       _ => Err(DecodeError::UnknownRequiredFeature),
+               }
+       }
+}
+
+impl Writeable for HTLCSource {
+       fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ::io::Error> {
+               match self {
+                       HTLCSource::OutboundRoute { ref session_priv, ref first_hop_htlc_msat, ref path, mpp_id } => {
+                               0u8.write(writer)?;
+                               let mpp_id_opt = Some(mpp_id);
+                               write_tlv_fields!(writer, {
+                                       (0, session_priv, required),
+                                       (1, mpp_id_opt, option),
+                                       (2, first_hop_htlc_msat, required),
+                                       (4, path, vec_type),
+                                });
+                       }
+                       HTLCSource::PreviousHopData(ref field) => {
+                               1u8.write(writer)?;
+                               field.write(writer)?;
+                       }
+               }
+               Ok(())
+       }
+}
 
 impl_writeable_tlv_based_enum!(HTLCFailReason,
        (0, LightningError) => {
@@ -4747,7 +5063,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> Writeable f
         F::Target: FeeEstimator,
         L::Target: Logger,
 {
-       fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ::std::io::Error> {
+       fn write<W: Writer>(&self, writer: &mut W) -> Result<(), io::Error> {
                let _consistency_lock = self.total_consistency_lock.write().unwrap();
 
                write_ver_prefix!(writer, SERIALIZATION_VERSION, MIN_SERIALIZATION_VERSION);
@@ -4828,12 +5144,21 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> Writeable f
                }
 
                let pending_outbound_payments = self.pending_outbound_payments.lock().unwrap();
-               (pending_outbound_payments.len() as u64).write(writer)?;
-               for session_priv in pending_outbound_payments.iter() {
-                       session_priv.write(writer)?;
+               // For backwards compat, write the session privs and their total length.
+               let mut num_pending_outbounds_compat: u64 = 0;
+               for (_, outbounds) in pending_outbound_payments.iter() {
+                       num_pending_outbounds_compat += outbounds.len() as u64;
+               }
+               num_pending_outbounds_compat.write(writer)?;
+               for (_, outbounds) in pending_outbound_payments.iter() {
+                       for outbound in outbounds.iter() {
+                               outbound.write(writer)?;
+                       }
                }
 
-               write_tlv_fields!(writer, {});
+               write_tlv_fields!(writer, {
+                       (1, pending_outbound_payments, required),
+               });
 
                Ok(())
        }
@@ -4943,7 +5268,7 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
         F::Target: FeeEstimator,
         L::Target: Logger,
 {
-       fn read<R: ::std::io::Read>(reader: &mut R, args: ChannelManagerReadArgs<'a, Signer, M, T, K, F, L>) -> Result<Self, DecodeError> {
+       fn read<R: io::Read>(reader: &mut R, args: ChannelManagerReadArgs<'a, Signer, M, T, K, F, L>) -> Result<Self, DecodeError> {
                let (blockhash, chan_manager) = <(BlockHash, ChannelManager<Signer, M, T, K, F, L>)>::read(reader, args)?;
                Ok((blockhash, Arc::new(chan_manager)))
        }
@@ -4957,7 +5282,7 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
         F::Target: FeeEstimator,
         L::Target: Logger,
 {
-       fn read<R: ::std::io::Read>(reader: &mut R, mut args: ChannelManagerReadArgs<'a, Signer, M, T, K, F, L>) -> Result<Self, DecodeError> {
+       fn read<R: io::Read>(reader: &mut R, mut args: ChannelManagerReadArgs<'a, Signer, M, T, K, F, L>) -> Result<Self, DecodeError> {
                let _ver = read_ver_prefix!(reader, SERIALIZATION_VERSION);
 
                let genesis_hash: BlockHash = Readable::read(reader)?;
@@ -4993,6 +5318,10 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
                                                channel.get_cur_counterparty_commitment_transaction_number() > monitor.get_cur_counterparty_commitment_number() ||
                                                channel.get_latest_monitor_update_id() < monitor.get_latest_update_id() {
                                        // But if the channel is behind of the monitor, close the channel:
+                                       log_error!(args.logger, "A ChannelManager is stale compared to the current ChannelMonitor!");
+                                       log_error!(args.logger, " The channel will be force-closed and the latest commitment transaction from the ChannelMonitor broadcast.");
+                                       log_error!(args.logger, " The ChannelMonitor for channel {} is at update_id {} but the ChannelManager is at update_id {}.",
+                                               log_bytes!(channel.channel_id()), monitor.get_latest_update_id(), channel.get_latest_monitor_update_id());
                                        let (_, mut new_failed_htlcs) = channel.force_shutdown(true);
                                        failed_htlcs.append(&mut new_failed_htlcs);
                                        monitor.broadcast_latest_holder_commitment_txn(&args.tx_broadcaster, &args.logger);
@@ -5082,15 +5411,23 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
                        }
                }
 
-               let pending_outbound_payments_count: u64 = Readable::read(reader)?;
-               let mut pending_outbound_payments: HashSet<[u8; 32]> = HashSet::with_capacity(cmp::min(pending_outbound_payments_count as usize, MAX_ALLOC_SIZE/32));
-               for _ in 0..pending_outbound_payments_count {
-                       if !pending_outbound_payments.insert(Readable::read(reader)?) {
-                               return Err(DecodeError::InvalidValue);
-                       }
+               let pending_outbound_payments_count_compat: u64 = Readable::read(reader)?;
+               let mut pending_outbound_payments_compat: HashMap<MppId, HashSet<[u8; 32]>> =
+                       HashMap::with_capacity(cmp::min(pending_outbound_payments_count_compat as usize, MAX_ALLOC_SIZE/32));
+               for _ in 0..pending_outbound_payments_count_compat {
+                       let session_priv = Readable::read(reader)?;
+                       if pending_outbound_payments_compat.insert(MppId(session_priv), [session_priv].iter().cloned().collect()).is_some() {
+                               return Err(DecodeError::InvalidValue)
+                       };
                }
 
-               read_tlv_fields!(reader, {});
+               let mut pending_outbound_payments = None;
+               read_tlv_fields!(reader, {
+                       (1, pending_outbound_payments, option),
+               });
+               if pending_outbound_payments.is_none() {
+                       pending_outbound_payments = Some(pending_outbound_payments_compat);
+               }
 
                let mut secp_ctx = Secp256k1::new();
                secp_ctx.seeded_randomize(&args.keys_manager.get_secure_random_bytes());
@@ -5111,7 +5448,7 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
                                pending_msg_events: Vec::new(),
                        }),
                        pending_inbound_payments: Mutex::new(pending_inbound_payments),
-                       pending_outbound_payments: Mutex::new(pending_outbound_payments),
+                       pending_outbound_payments: Mutex::new(pending_outbound_payments.unwrap()),
 
                        our_network_key: args.keys_manager.get_node_secret(),
                        our_network_pubkey: PublicKey::from_secret_key(&secp_ctx, &args.keys_manager.get_node_secret()),
@@ -5147,23 +5484,26 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
 mod tests {
        use bitcoin::hashes::Hash;
        use bitcoin::hashes::sha256::Hash as Sha256;
-       use core::sync::atomic::{AtomicBool, Ordering};
        use core::time::Duration;
        use ln::{PaymentPreimage, PaymentHash, PaymentSecret};
-       use ln::channelmanager::PersistenceNotifier;
+       use ln::channelmanager::{MppId, PaymentSendFailure};
        use ln::features::{InitFeatures, InvoiceFeatures};
        use ln::functional_test_utils::*;
        use ln::msgs;
        use ln::msgs::ChannelMessageHandler;
        use routing::router::{get_keysend_route, get_route};
+       use util::errors::APIError;
        use util::events::{Event, MessageSendEvent, MessageSendEventsProvider};
        use util::test_utils;
-       use std::sync::Arc;
-       use std::thread;
 
        #[cfg(feature = "std")]
        #[test]
        fn test_wait_timeout() {
+               use ln::channelmanager::PersistenceNotifier;
+               use sync::Arc;
+               use core::sync::atomic::{AtomicBool, Ordering};
+               use std::thread;
+
                let persistence_notifier = Arc::new(PersistenceNotifier::new());
                let thread_notifier = Arc::clone(&persistence_notifier);
 
@@ -5213,6 +5553,12 @@ mod tests {
                let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]);
                let nodes = create_network(3, &node_cfgs, &node_chanmgrs);
 
+               // All nodes start with a persistable update pending as `create_network` connects each node
+               // with all other nodes to make most tests simpler.
+               assert!(nodes[0].node.await_persistable_update_timeout(Duration::from_millis(1)));
+               assert!(nodes[1].node.await_persistable_update_timeout(Duration::from_millis(1)));
+               assert!(nodes[2].node.await_persistable_update_timeout(Duration::from_millis(1)));
+
                let mut chan = create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::known(), InitFeatures::known());
 
                // We check that the channel info nodes have doesn't change too early, even though we try
@@ -5289,12 +5635,13 @@ mod tests {
 
                // First, send a partial MPP payment.
                let net_graph_msg_handler = &nodes[0].net_graph_msg_handler;
-               let route = get_route(&nodes[0].node.get_our_node_id(), &net_graph_msg_handler.network_graph.read().unwrap(), &nodes[1].node.get_our_node_id(), Some(InvoiceFeatures::known()), None, &Vec::new(), 100_000, TEST_FINAL_CLTV, &logger).unwrap();
+               let route = get_route(&nodes[0].node.get_our_node_id(), &net_graph_msg_handler.network_graph, &nodes[1].node.get_our_node_id(), Some(InvoiceFeatures::known()), None, &Vec::new(), 100_000, TEST_FINAL_CLTV, &logger).unwrap();
                let (payment_preimage, our_payment_hash, payment_secret) = get_payment_preimage_hash!(&nodes[1]);
+               let mpp_id = MppId([42; 32]);
                // Use the utility function send_payment_along_path to send the payment with MPP data which
                // indicates there are more HTLCs coming.
                let cur_height = CHAN_CONFIRM_DEPTH + 1; // route_payment calls send_payment, which adds 1 to the current height. So we do the same here to match.
-               nodes[0].node.send_payment_along_path(&route.paths[0], &our_payment_hash, &Some(payment_secret), 200_000, cur_height, &None).unwrap();
+               nodes[0].node.send_payment_along_path(&route.paths[0], &our_payment_hash, &Some(payment_secret), 200_000, cur_height, mpp_id, &None).unwrap();
                check_added_monitors!(nodes[0], 1);
                let mut events = nodes[0].node.get_and_clear_pending_msg_events();
                assert_eq!(events.len(), 1);
@@ -5324,7 +5671,7 @@ mod tests {
                expect_payment_failed!(nodes[0], our_payment_hash, true);
 
                // Send the second half of the original MPP payment.
-               nodes[0].node.send_payment_along_path(&route.paths[0], &our_payment_hash, &Some(payment_secret), 200_000, cur_height, &None).unwrap();
+               nodes[0].node.send_payment_along_path(&route.paths[0], &our_payment_hash, &Some(payment_secret), 200_000, cur_height, mpp_id, &None).unwrap();
                check_added_monitors!(nodes[0], 1);
                let mut events = nodes[0].node.get_and_clear_pending_msg_events();
                assert_eq!(events.len(), 1);
@@ -5362,7 +5709,8 @@ mod tests {
                nodes[0].node.handle_revoke_and_ack(&nodes[1].node.get_our_node_id(), &bs_third_raa);
                check_added_monitors!(nodes[0], 1);
 
-               // There's an existing bug that generates a PaymentSent event for each MPP path, so handle that here.
+               // Note that successful MPP payments will generate 1 event upon the first path's success. No
+               // further events will be generated for subsequence path successes.
                let events = nodes[0].node.get_and_clear_pending_events();
                match events[0] {
                        Event::PaymentSent { payment_preimage: ref preimage } => {
@@ -5370,12 +5718,6 @@ mod tests {
                        },
                        _ => panic!("Unexpected event"),
                }
-               match events[1] {
-                       Event::PaymentSent { payment_preimage: ref preimage } => {
-                               assert_eq!(payment_preimage, *preimage);
-                       },
-                       _ => panic!("Unexpected event"),
-               }
        }
 
        #[test]
@@ -5396,7 +5738,7 @@ mod tests {
                let (payment_preimage, payment_hash, _) = route_payment(&nodes[0], &expected_route, 100_000);
 
                // Next, attempt a keysend payment and make sure it fails.
-               let route = get_route(&nodes[0].node.get_our_node_id(), &nodes[0].net_graph_msg_handler.network_graph.read().unwrap(), &expected_route.last().unwrap().node.get_our_node_id(), Some(InvoiceFeatures::known()), None, &Vec::new(), 100_000, TEST_FINAL_CLTV, &logger).unwrap();
+               let route = get_route(&nodes[0].node.get_our_node_id(), &nodes[0].net_graph_msg_handler.network_graph, &expected_route.last().unwrap().node.get_our_node_id(), Some(InvoiceFeatures::known()), None, &Vec::new(), 100_000, TEST_FINAL_CLTV, &logger).unwrap();
                nodes[0].node.send_spontaneous_payment(&route, Some(payment_preimage)).unwrap();
                check_added_monitors!(nodes[0], 1);
                let mut events = nodes[0].node.get_and_clear_pending_msg_events();
@@ -5424,7 +5766,7 @@ mod tests {
 
                // To start (2), send a keysend payment but don't claim it.
                let payment_preimage = PaymentPreimage([42; 32]);
-               let route = get_route(&nodes[0].node.get_our_node_id(), &nodes[0].net_graph_msg_handler.network_graph.read().unwrap(), &expected_route.last().unwrap().node.get_our_node_id(), Some(InvoiceFeatures::known()), None, &Vec::new(), 100_000, TEST_FINAL_CLTV, &logger).unwrap();
+               let route = get_route(&nodes[0].node.get_our_node_id(), &nodes[0].net_graph_msg_handler.network_graph, &expected_route.last().unwrap().node.get_our_node_id(), Some(InvoiceFeatures::known()), None, &Vec::new(), 100_000, TEST_FINAL_CLTV, &logger).unwrap();
                let payment_hash = nodes[0].node.send_spontaneous_payment(&route, Some(payment_preimage)).unwrap();
                check_added_monitors!(nodes[0], 1);
                let mut events = nodes[0].node.get_and_clear_pending_msg_events();
@@ -5476,9 +5818,9 @@ mod tests {
                nodes[1].node.peer_connected(&payer_pubkey, &msgs::Init { features: InitFeatures::known() });
 
                let _chan = create_chan_between_nodes(&nodes[0], &nodes[1], InitFeatures::known(), InitFeatures::known());
-               let network_graph = nodes[0].net_graph_msg_handler.network_graph.read().unwrap();
+               let network_graph = &nodes[0].net_graph_msg_handler.network_graph;
                let first_hops = nodes[0].node.list_usable_channels();
-               let route = get_keysend_route(&payer_pubkey, &network_graph, &payee_pubkey,
+               let route = get_keysend_route(&payer_pubkey, network_graph, &payee_pubkey,
                                   Some(&first_hops.iter().collect::<Vec<_>>()), &vec![], 10000, 40,
                                   nodes[0].logger).unwrap();
 
@@ -5512,9 +5854,9 @@ mod tests {
                nodes[1].node.peer_connected(&payer_pubkey, &msgs::Init { features: InitFeatures::known() });
 
                let _chan = create_chan_between_nodes(&nodes[0], &nodes[1], InitFeatures::known(), InitFeatures::known());
-               let network_graph = nodes[0].net_graph_msg_handler.network_graph.read().unwrap();
+               let network_graph = &nodes[0].net_graph_msg_handler.network_graph;
                let first_hops = nodes[0].node.list_usable_channels();
-               let route = get_keysend_route(&payer_pubkey, &network_graph, &payee_pubkey,
+               let route = get_keysend_route(&payer_pubkey, network_graph, &payee_pubkey,
                                   Some(&first_hops.iter().collect::<Vec<_>>()), &vec![], 10000, 40,
                                   nodes[0].logger).unwrap();
 
@@ -5534,6 +5876,39 @@ mod tests {
 
                nodes[1].logger.assert_log_contains("lightning::ln::channelmanager".to_string(), "We don't support MPP keysend payments".to_string(), 1);
        }
+
+       #[test]
+       fn test_multi_hop_missing_secret() {
+               let chanmon_cfgs = create_chanmon_cfgs(4);
+               let node_cfgs = create_node_cfgs(4, &chanmon_cfgs);
+               let node_chanmgrs = create_node_chanmgrs(4, &node_cfgs, &[None, None, None, None]);
+               let nodes = create_network(4, &node_cfgs, &node_chanmgrs);
+
+               let chan_1_id = create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::known(), InitFeatures::known()).0.contents.short_channel_id;
+               let chan_2_id = create_announced_chan_between_nodes(&nodes, 0, 2, InitFeatures::known(), InitFeatures::known()).0.contents.short_channel_id;
+               let chan_3_id = create_announced_chan_between_nodes(&nodes, 1, 3, InitFeatures::known(), InitFeatures::known()).0.contents.short_channel_id;
+               let chan_4_id = create_announced_chan_between_nodes(&nodes, 2, 3, InitFeatures::known(), InitFeatures::known()).0.contents.short_channel_id;
+               let logger = test_utils::TestLogger::new();
+
+               // Marshall an MPP route.
+               let (_, payment_hash, _) = get_payment_preimage_hash!(&nodes[3]);
+               let net_graph_msg_handler = &nodes[0].net_graph_msg_handler;
+               let mut route = get_route(&nodes[0].node.get_our_node_id(), &net_graph_msg_handler.network_graph, &nodes[3].node.get_our_node_id(), Some(InvoiceFeatures::known()), None, &[], 100000, TEST_FINAL_CLTV, &logger).unwrap();
+               let path = route.paths[0].clone();
+               route.paths.push(path);
+               route.paths[0][0].pubkey = nodes[1].node.get_our_node_id();
+               route.paths[0][0].short_channel_id = chan_1_id;
+               route.paths[0][1].short_channel_id = chan_3_id;
+               route.paths[1][0].pubkey = nodes[2].node.get_our_node_id();
+               route.paths[1][0].short_channel_id = chan_2_id;
+               route.paths[1][1].short_channel_id = chan_4_id;
+
+               match nodes[0].node.send_payment(&route, payment_hash, &None).unwrap_err() {
+                       PaymentSendFailure::ParameterError(APIError::APIMisuseError { ref err }) => {
+                               assert!(regex::Regex::new(r"Payment secret is required for multi-path payments").unwrap().is_match(err))                        },
+                       _ => panic!("unexpected error")
+               }
+       }
 }
 
 #[cfg(all(any(test, feature = "_test_utils"), feature = "unstable"))]
@@ -5545,7 +5920,7 @@ pub mod bench {
        use ln::channelmanager::{BestBlock, ChainParameters, ChannelManager, PaymentHash, PaymentPreimage};
        use ln::features::{InitFeatures, InvoiceFeatures};
        use ln::functional_test_utils::*;
-       use ln::msgs::ChannelMessageHandler;
+       use ln::msgs::{ChannelMessageHandler, Init};
        use routing::network_graph::NetworkGraph;
        use routing::router::get_route;
        use util::test_utils;
@@ -5608,6 +5983,8 @@ pub mod bench {
                });
                let node_b_holder = NodeHolder { node: &node_b };
 
+               node_a.peer_connected(&node_b.get_our_node_id(), &Init { features: InitFeatures::known() });
+               node_b.peer_connected(&node_a.get_our_node_id(), &Init { features: InitFeatures::known() });
                node_a.create_channel(node_b.get_our_node_id(), 8_000_000, 100_000_000, 42, None).unwrap();
                node_b.handle_open_channel(&node_a.get_our_node_id(), InitFeatures::known(), &get_event_msg!(node_a_holder, MessageSendEvent::SendOpenChannel, node_b.get_our_node_id()));
                node_a.handle_accept_channel(&node_b.get_our_node_id(), InitFeatures::known(), &get_event_msg!(node_b_holder, MessageSendEvent::SendAcceptChannel, node_a.get_our_node_id()));