Merge pull request #1080 from valentinewallace/2021-09-dup-chan-outpoint
authorMatt Corallo <649246+TheBlueMatt@users.noreply.github.com>
Thu, 30 Sep 2021 00:01:35 +0000 (00:01 +0000)
committerGitHub <noreply@github.com>
Thu, 30 Sep 2021 00:01:35 +0000 (00:01 +0000)
Fix fuzzer-found panic from duplicate channel outpoint

1  2 
lightning/src/ln/channelmanager.rs

index a0ffe263c84e492773bc8d79b2b9ccfe90910095,ef16af455adc6f02f4c8131d680f994b8af176ae..f154ab3dc79670186d46c1433462a7d256944e00
@@@ -43,6 -43,7 +43,6 @@@ use chain::transaction::{OutPoint, Tran
  // Since this struct is returned in `list_channels` methods, expose it here in case users want to
  // construct one themselves.
  use ln::{PaymentHash, PaymentPreimage, PaymentSecret};
 -pub use ln::channel::CounterpartyForwardingInfo;
  use ln::channel::{Channel, ChannelError, ChannelUpdateStatus, UpdateFulfillCommitFetch};
  use ln::features::{InitFeatures, NodeFeatures};
  use routing::router::{Route, RouteHop};
@@@ -52,9 -53,9 +52,9 @@@ use ln::onion_utils
  use ln::msgs::{ChannelMessageHandler, DecodeError, LightningError, OptionalField};
  use chain::keysinterface::{Sign, KeysInterface, KeysManager, InMemorySigner};
  use util::config::UserConfig;
 -use util::events::{EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider};
 +use util::events::{EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider, ClosureReason};
  use util::{byte_utils, events};
 -use util::ser::{Readable, ReadableArgs, MaybeReadable, Writeable, Writer};
 +use util::ser::{BigSize, FixedLengthReader, Readable, ReadableArgs, MaybeReadable, Writeable, Writer};
  use util::chacha20::{ChaCha20, ChaChaReader};
  use util::logger::{Logger, Level};
  use util::errors::APIError;
@@@ -172,22 -173,6 +172,22 @@@ struct ClaimableHTLC 
        onion_payload: OnionPayload,
  }
  
 +/// A payment identifier used to correlate an MPP payment's per-path HTLC sources internally.
 +#[derive(Hash, Copy, Clone, PartialEq, Eq, Debug)]
 +pub(crate) struct MppId(pub [u8; 32]);
 +
 +impl Writeable for MppId {
 +      fn write<W: Writer>(&self, w: &mut W) -> Result<(), io::Error> {
 +              self.0.write(w)
 +      }
 +}
 +
 +impl Readable for MppId {
 +      fn read<R: Read>(r: &mut R) -> Result<Self, DecodeError> {
 +              let buf: [u8; 32] = Readable::read(r)?;
 +              Ok(MppId(buf))
 +      }
 +}
  /// Tracks the inbound corresponding to an outbound HTLC
  #[derive(Clone, PartialEq)]
  pub(crate) enum HTLCSource {
                /// Technically we can recalculate this from the route, but we cache it here to avoid
                /// doing a double-pass on route when we get a failure back
                first_hop_htlc_msat: u64,
 +              mpp_id: MppId,
        },
  }
  #[cfg(test)]
@@@ -208,7 -192,6 +208,7 @@@ impl HTLCSource 
                        path: Vec::new(),
                        session_priv: SecretKey::from_slice(&[1; 32]).unwrap(),
                        first_hop_htlc_msat: 0,
 +                      mpp_id: MppId([2; 32]),
                }
        }
  }
@@@ -242,7 -225,6 +242,7 @@@ type ShutdownResult = (Option<(OutPoint
  
  struct MsgHandleErrInternal {
        err: msgs::LightningError,
 +      chan_id: Option<[u8; 32]>, // If Some a channel of ours has been closed
        shutdown_finish: Option<(ShutdownResult, Option<msgs::ChannelUpdate>)>,
  }
  impl MsgHandleErrInternal {
                                        },
                                },
                        },
 +                      chan_id: None,
                        shutdown_finish: None,
                }
        }
                                err,
                                action: msgs::ErrorAction::IgnoreError,
                        },
 +                      chan_id: None,
                        shutdown_finish: None,
                }
        }
        #[inline]
        fn from_no_close(err: msgs::LightningError) -> Self {
 -              Self { err, shutdown_finish: None }
 +              Self { err, chan_id: None, shutdown_finish: None }
        }
        #[inline]
        fn from_finish_shutdown(err: String, channel_id: [u8; 32], shutdown_res: ShutdownResult, channel_update: Option<msgs::ChannelUpdate>) -> Self {
                                        },
                                },
                        },
 +                      chan_id: Some(channel_id),
                        shutdown_finish: Some((shutdown_res, channel_update)),
                }
        }
                                        },
                                },
                        },
 +                      chan_id: None,
                        shutdown_finish: None,
                }
        }
@@@ -489,17 -467,14 +489,17 @@@ pub struct ChannelManager<Signer: Sign
        /// The session_priv bytes of outbound payments which are pending resolution.
        /// The authoritative state of these HTLCs resides either within Channels or ChannelMonitors
        /// (if the channel has been force-closed), however we track them here to prevent duplicative
 -      /// PaymentSent/PaymentFailed events. Specifically, in the case of a duplicative
 +      /// PaymentSent/PaymentPathFailed events. Specifically, in the case of a duplicative
        /// update_fulfill_htlc message after a reconnect, we may "claim" a payment twice.
        /// Additionally, because ChannelMonitors are often not re-serialized after connecting block(s)
        /// which may generate a claim event, we may receive similar duplicate claim/fail MonitorEvents
        /// after reloading from disk while replaying blocks against ChannelMonitors.
        ///
 +      /// Each payment has each of its MPP part's session_priv bytes in the HashSet of the map (even
 +      /// payments over a single path).
 +      ///
        /// Locked *after* channel_state.
 -      pending_outbound_payments: Mutex<HashSet<[u8; 32]>>,
 +      pending_outbound_payments: Mutex<HashMap<MppId, HashSet<[u8; 32]>>>,
  
        our_network_key: SecretKey,
        our_network_pubkey: PublicKey,
@@@ -651,19 -626,6 +651,19 @@@ const CHECK_CLTV_EXPIRY_SANITY: u32 = M
  #[allow(dead_code)]
  const CHECK_CLTV_EXPIRY_SANITY_2: u32 = MIN_CLTV_EXPIRY_DELTA as u32 - LATENCY_GRACE_PERIOD_BLOCKS - 2*CLTV_CLAIM_BUFFER;
  
 +/// Information needed for constructing an invoice route hint for this channel.
 +#[derive(Clone, Debug, PartialEq)]
 +pub struct CounterpartyForwardingInfo {
 +      /// Base routing fee in millisatoshis.
 +      pub fee_base_msat: u32,
 +      /// Amount in millionths of a satoshi the channel will charge per transferred satoshi.
 +      pub fee_proportional_millionths: u32,
 +      /// The minimum difference in cltv_expiry between an ingoing HTLC and its outgoing counterpart,
 +      /// such that the outgoing HTLC is forwardable to this counterparty. See `msgs::ChannelUpdate`'s
 +      /// `cltv_expiry_delta` for more details.
 +      pub cltv_expiry_delta: u16,
 +}
 +
  /// Channel parameters which apply to our counterparty. These are split out from [`ChannelDetails`]
  /// to better separate parameters.
  #[derive(Clone, Debug, PartialEq)]
@@@ -818,13 -780,12 +818,13 @@@ macro_rules! handle_error 
        ($self: ident, $internal: expr, $counterparty_node_id: expr) => {
                match $internal {
                        Ok(msg) => Ok(msg),
 -                      Err(MsgHandleErrInternal { err, shutdown_finish }) => {
 +                      Err(MsgHandleErrInternal { err, chan_id, shutdown_finish }) => {
                                #[cfg(debug_assertions)]
                                {
                                        // In testing, ensure there are no deadlocks where the lock is already held upon
                                        // entering the macro.
                                        assert!($self.channel_state.try_lock().is_ok());
 +                                      assert!($self.pending_events.try_lock().is_ok());
                                }
  
                                let mut msg_events = Vec::with_capacity(2);
                                                        msg: update
                                                });
                                        }
 +                                      if let Some(channel_id) = chan_id {
 +                                              $self.pending_events.lock().unwrap().push(events::Event::ChannelClosed { channel_id,  reason: ClosureReason::ProcessingError { err: err.err.clone() } });
 +                                      }
                                }
  
                                log_error!($self.logger, "{}", err.err);
@@@ -1180,7 -1138,7 +1180,7 @@@ impl<Signer: Sign, M: Deref, T: Deref, 
                                pending_msg_events: Vec::new(),
                        }),
                        pending_inbound_payments: Mutex::new(HashMap::new()),
 -                      pending_outbound_payments: Mutex::new(HashSet::new()),
 +                      pending_outbound_payments: Mutex::new(HashMap::new()),
  
                        our_network_key: keys_manager.get_node_secret(),
                        our_network_pubkey: PublicKey::from_secret_key(&secp_ctx, &keys_manager.get_node_secret()),
                                                                msg: channel_update
                                                        });
                                                }
 +                                              if let Ok(mut pending_events_lock) = self.pending_events.lock() {
 +                                                      pending_events_lock.push(events::Event::ChannelClosed {
 +                                                              channel_id: *channel_id,
 +                                                              reason: ClosureReason::HolderForceClosed
 +                                                      });
 +                                              }
                                        }
                                        break Ok(());
                                },
                }
        }
  
 -      fn force_close_channel_with_peer(&self, channel_id: &[u8; 32], peer_node_id: Option<&PublicKey>) -> Result<PublicKey, APIError> {
 +      /// `peer_node_id` should be set when we receive a message from a peer, but not set when the
 +      /// user closes, which will be re-exposed as the `ChannelClosed` reason.
 +      fn force_close_channel_with_peer(&self, channel_id: &[u8; 32], peer_node_id: Option<&PublicKey>, peer_msg: Option<&String>) -> Result<PublicKey, APIError> {
                let mut chan = {
                        let mut channel_state_lock = self.channel_state.lock().unwrap();
                        let channel_state = &mut *channel_state_lock;
                                if let Some(short_id) = chan.get().get_short_channel_id() {
                                        channel_state.short_to_id.remove(&short_id);
                                }
 +                              let mut pending_events_lock = self.pending_events.lock().unwrap();
 +                              if peer_node_id.is_some() {
 +                                      if let Some(peer_msg) = peer_msg {
 +                                              pending_events_lock.push(events::Event::ChannelClosed { channel_id: *channel_id, reason: ClosureReason::CounterpartyForceClosed { peer_msg: peer_msg.to_string() } });
 +                                      }
 +                              } else {
 +                                      pending_events_lock.push(events::Event::ChannelClosed { channel_id: *channel_id, reason: ClosureReason::HolderForceClosed });
 +                              }
                                chan.remove_entry().1
                        } else {
                                return Err(APIError::ChannelUnavailable{err: "No such channel".to_owned()});
        /// the chain and rejecting new HTLCs on the given channel. Fails if channel_id is unknown to the manager.
        pub fn force_close_channel(&self, channel_id: &[u8; 32]) -> Result<(), APIError> {
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
 -              match self.force_close_channel_with_peer(channel_id, None) {
 +              match self.force_close_channel_with_peer(channel_id, None, None) {
                        Ok(counterparty_node_id) => {
                                self.channel_state.lock().unwrap().pending_msg_events.push(
                                        events::MessageSendEvent::HandleError {
        }
  
        // Only public for testing, this should otherwise never be called direcly
 -      pub(crate) fn send_payment_along_path(&self, path: &Vec<RouteHop>, payment_hash: &PaymentHash, payment_secret: &Option<PaymentSecret>, total_value: u64, cur_height: u32, keysend_preimage: &Option<PaymentPreimage>) -> Result<(), APIError> {
 +      pub(crate) fn send_payment_along_path(&self, path: &Vec<RouteHop>, payment_hash: &PaymentHash, payment_secret: &Option<PaymentSecret>, total_value: u64, cur_height: u32, mpp_id: MppId, keysend_preimage: &Option<PaymentPreimage>) -> Result<(), APIError> {
                log_trace!(self.logger, "Attempting to send payment for path with next hop {}", path.first().unwrap().short_channel_id);
                let prng_seed = self.keys_manager.get_secure_random_bytes();
                let session_priv_bytes = self.keys_manager.get_secure_random_bytes();
                let onion_packet = onion_utils::construct_onion_packet(onion_payloads, onion_keys, prng_seed, payment_hash);
  
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
 -              assert!(self.pending_outbound_payments.lock().unwrap().insert(session_priv_bytes));
 +              let mut pending_outbounds = self.pending_outbound_payments.lock().unwrap();
 +              let sessions = pending_outbounds.entry(mpp_id).or_insert(HashSet::new());
 +              assert!(sessions.insert(session_priv_bytes));
  
                let err: Result<(), _> = loop {
                        let mut channel_lock = self.channel_state.lock().unwrap();
                                                path: path.clone(),
                                                session_priv: session_priv.clone(),
                                                first_hop_htlc_msat: htlc_msat,
 +                                              mpp_id,
                                        }, onion_packet, &self.logger), channel_state, chan)
                                } {
                                        Some((update_add, commitment_signed, monitor_update)) => {
                let mut total_value = 0;
                let our_node_id = self.get_our_node_id();
                let mut path_errs = Vec::with_capacity(route.paths.len());
 +              let mpp_id = MppId(self.keys_manager.get_secure_random_bytes());
                'path_check: for path in route.paths.iter() {
                        if path.len() < 1 || path.len() > 20 {
                                path_errs.push(Err(APIError::RouteError{err: "Path didn't go anywhere/had bogus size"}));
                let cur_height = self.best_block.read().unwrap().height() + 1;
                let mut results = Vec::new();
                for path in route.paths.iter() {
 -                      results.push(self.send_payment_along_path(&path, &payment_hash, payment_secret, total_value, cur_height, &keysend_preimage));
 +                      results.push(self.send_payment_along_path(&path, &payment_hash, payment_secret, total_value, cur_height, mpp_id, &keysend_preimage));
                }
                let mut has_ok = false;
                let mut has_err = false;
                                                                                        if let Some(short_id) = channel.get_short_channel_id() {
                                                                                                channel_state.short_to_id.remove(&short_id);
                                                                                        }
 +                                                                                      // ChannelClosed event is generated by handle_error for us.
                                                                                        Err(MsgHandleErrInternal::from_finish_shutdown(msg, channel_id, channel.force_shutdown(true), self.get_channel_update_for_broadcast(&channel).ok()))
                                                                                },
                                                                                ChannelError::CloseDelayBroadcast(_) => { panic!("Wait is only generated on receipt of channel_reestablish, which is handled by try_chan_entry, we don't bother to support it here"); }
                                        self.fail_htlc_backwards_internal(channel_state,
                                                htlc_src, &payment_hash, HTLCFailReason::Reason { failure_code, data: onion_failure_data});
                                },
 -                              HTLCSource::OutboundRoute { session_priv, .. } => {
 -                                      if {
 -                                              let mut session_priv_bytes = [0; 32];
 -                                              session_priv_bytes.copy_from_slice(&session_priv[..]);
 -                                              self.pending_outbound_payments.lock().unwrap().remove(&session_priv_bytes)
 -                                      } {
 -                                              self.pending_events.lock().unwrap().push(
 -                                                      events::Event::PaymentFailed {
 -                                                              payment_hash,
 -                                                              rejected_by_dest: false,
 -#[cfg(test)]
 -                                                              error_code: None,
 -#[cfg(test)]
 -                                                              error_data: None,
 +                              HTLCSource::OutboundRoute { session_priv, mpp_id, path, .. } => {
 +                                      let mut session_priv_bytes = [0; 32];
 +                                      session_priv_bytes.copy_from_slice(&session_priv[..]);
 +                                      let mut outbounds = self.pending_outbound_payments.lock().unwrap();
 +                                      if let hash_map::Entry::Occupied(mut sessions) = outbounds.entry(mpp_id) {
 +                                              if sessions.get_mut().remove(&session_priv_bytes) {
 +                                                      self.pending_events.lock().unwrap().push(
 +                                                              events::Event::PaymentPathFailed {
 +                                                                      payment_hash,
 +                                                                      rejected_by_dest: false,
 +                                                                      network_update: None,
 +                                                                      all_paths_failed: sessions.get().len() == 0,
 +                                                                      path: path.clone(),
 +                                                                      #[cfg(test)]
 +                                                                      error_code: None,
 +                                                                      #[cfg(test)]
 +                                                                      error_data: None,
 +                                                              }
 +                                                      );
 +                                                      if sessions.get().len() == 0 {
 +                                                              sessions.remove();
                                                        }
 -                                              )
 +                                              }
                                        } else {
                                                log_trace!(self.logger, "Received duplicative fail for HTLC with payment_hash {}", log_bytes!(payment_hash.0));
                                        }
                // from block_connected which may run during initialization prior to the chain_monitor
                // being fully configured. See the docs for `ChannelManagerReadArgs` for more.
                match source {
 -                      HTLCSource::OutboundRoute { ref path, session_priv, .. } => {
 -                              if {
 -                                      let mut session_priv_bytes = [0; 32];
 -                                      session_priv_bytes.copy_from_slice(&session_priv[..]);
 -                                      !self.pending_outbound_payments.lock().unwrap().remove(&session_priv_bytes)
 -                              } {
 +                      HTLCSource::OutboundRoute { ref path, session_priv, mpp_id, .. } => {
 +                              let mut session_priv_bytes = [0; 32];
 +                              session_priv_bytes.copy_from_slice(&session_priv[..]);
 +                              let mut outbounds = self.pending_outbound_payments.lock().unwrap();
 +                              let mut all_paths_failed = false;
 +                              if let hash_map::Entry::Occupied(mut sessions) = outbounds.entry(mpp_id) {
 +                                      if !sessions.get_mut().remove(&session_priv_bytes) {
 +                                              log_trace!(self.logger, "Received duplicative fail for HTLC with payment_hash {}", log_bytes!(payment_hash.0));
 +                                              return;
 +                                      }
 +                                      if sessions.get().len() == 0 {
 +                                              all_paths_failed = true;
 +                                              sessions.remove();
 +                                      }
 +                              } else {
                                        log_trace!(self.logger, "Received duplicative fail for HTLC with payment_hash {}", log_bytes!(payment_hash.0));
                                        return;
                                }
                                match &onion_error {
                                        &HTLCFailReason::LightningError { ref err } => {
  #[cfg(test)]
 -                                              let (channel_update, payment_retryable, onion_error_code, onion_error_data) = onion_utils::process_onion_failure(&self.secp_ctx, &self.logger, &source, err.data.clone());
 +                                              let (network_update, payment_retryable, onion_error_code, onion_error_data) = onion_utils::process_onion_failure(&self.secp_ctx, &self.logger, &source, err.data.clone());
  #[cfg(not(test))]
 -                                              let (channel_update, payment_retryable, _, _) = onion_utils::process_onion_failure(&self.secp_ctx, &self.logger, &source, err.data.clone());
 +                                              let (network_update, payment_retryable, _, _) = onion_utils::process_onion_failure(&self.secp_ctx, &self.logger, &source, err.data.clone());
                                                // TODO: If we decided to blame ourselves (or one of our channels) in
                                                // process_onion_failure we should close that channel as it implies our
                                                // next-hop is needlessly blaming us!
 -                                              if let Some(update) = channel_update {
 -                                                      self.channel_state.lock().unwrap().pending_msg_events.push(
 -                                                              events::MessageSendEvent::PaymentFailureNetworkUpdate {
 -                                                                      update,
 -                                                              }
 -                                                      );
 -                                              }
                                                self.pending_events.lock().unwrap().push(
 -                                                      events::Event::PaymentFailed {
 +                                                      events::Event::PaymentPathFailed {
                                                                payment_hash: payment_hash.clone(),
                                                                rejected_by_dest: !payment_retryable,
 +                                                              network_update,
 +                                                              all_paths_failed,
 +                                                              path: path.clone(),
  #[cfg(test)]
                                                                error_code: onion_error_code,
  #[cfg(test)]
                                                        ref data,
                                                        .. } => {
                                                // we get a fail_malformed_htlc from the first hop
 -                                              // TODO: We'd like to generate a PaymentFailureNetworkUpdate for temporary
 +                                              // TODO: We'd like to generate a NetworkUpdate for temporary
                                                // failures here, but that would be insufficient as get_route
                                                // generally ignores its view of our own channels as we provide them via
                                                // ChannelDetails.
                                                // TODO: For non-temporary failures, we really should be closing the
                                                // channel here as we apparently can't relay through them anyway.
                                                self.pending_events.lock().unwrap().push(
 -                                                      events::Event::PaymentFailed {
 +                                                      events::Event::PaymentPathFailed {
                                                                payment_hash: payment_hash.clone(),
                                                                rejected_by_dest: path.len() == 1,
 +                                                              network_update: None,
 +                                                              all_paths_failed,
 +                                                              path: path.clone(),
  #[cfg(test)]
                                                                error_code: Some(*failure_code),
  #[cfg(test)]
  
        fn claim_funds_internal(&self, mut channel_state_lock: MutexGuard<ChannelHolder<Signer>>, source: HTLCSource, payment_preimage: PaymentPreimage, forwarded_htlc_value_msat: Option<u64>, from_onchain: bool) {
                match source {
 -                      HTLCSource::OutboundRoute { session_priv, .. } => {
 +                      HTLCSource::OutboundRoute { session_priv, mpp_id, .. } => {
                                mem::drop(channel_state_lock);
 -                              if {
 -                                      let mut session_priv_bytes = [0; 32];
 -                                      session_priv_bytes.copy_from_slice(&session_priv[..]);
 -                                      self.pending_outbound_payments.lock().unwrap().remove(&session_priv_bytes)
 -                              } {
 -                                      let mut pending_events = self.pending_events.lock().unwrap();
 -                                      pending_events.push(events::Event::PaymentSent {
 -                                              payment_preimage
 -                                      });
 +                              let mut session_priv_bytes = [0; 32];
 +                              session_priv_bytes.copy_from_slice(&session_priv[..]);
 +                              let mut outbounds = self.pending_outbound_payments.lock().unwrap();
 +                              let found_payment = if let Some(mut sessions) = outbounds.remove(&mpp_id) {
 +                                      sessions.remove(&session_priv_bytes)
 +                              } else { false };
 +                              if found_payment {
 +                                      self.pending_events.lock().unwrap().push(
 +                                              events::Event::PaymentSent { payment_preimage }
 +                                      );
                                } else {
                                        log_trace!(self.logger, "Received duplicative fulfill for HTLC with payment_preimage {}", log_bytes!(payment_preimage.0));
                                }
                                                Err(e) => try_chan_entry!(self, Err(e), channel_state, chan),
                                        };
                                        if let Err(e) = self.chain_monitor.watch_channel(chan.get().get_funding_txo().unwrap(), monitor) {
-                                               return_monitor_err!(self, e, channel_state, chan, RAACommitmentOrder::RevokeAndACKFirst, false, false);
+                                               let mut res = handle_monitor_err!(self, e, channel_state, chan, RAACommitmentOrder::RevokeAndACKFirst, false, false);
+                                               if let Err(MsgHandleErrInternal { ref mut shutdown_finish, .. }) = res {
+                                                       // We weren't able to watch the channel to begin with, so no updates should be made on
+                                                       // it. Previously, full_stack_target found an (unreachable) panic when the
+                                                       // monitor update contained within `shutdown_finish` was applied.
+                                                       if let Some((ref mut shutdown_finish, _)) = shutdown_finish {
+                                                               shutdown_finish.0.take();
+                                                       }
+                                               }
+                                               return res
                                        }
                                        funding_tx
                                },
                                        msg: update
                                });
                        }
 +                      self.pending_events.lock().unwrap().push(events::Event::ChannelClosed { channel_id: msg.channel_id,  reason: ClosureReason::CooperativeClosure });
                }
                Ok(())
        }
                                }
  
                                let create_pending_htlc_status = |chan: &Channel<Signer>, pending_forward_info: PendingHTLCStatus, error_code: u16| {
 -                                      // Ensure error_code has the UPDATE flag set, since by default we send a
 -                                      // channel update along as part of failing the HTLC.
 -                                      assert!((error_code & 0x1000) != 0);
                                        // If the update_add is completely bogus, the call will Err and we will close,
                                        // but if we've sent a shutdown and they haven't acknowledged it yet, we just
                                        // want to reject the new HTLC and fail it backwards instead of forwarding.
                                        match pending_forward_info {
                                                PendingHTLCStatus::Forward(PendingHTLCInfo { ref incoming_shared_secret, .. }) => {
 -                                                      let reason = if let Ok(upd) = self.get_channel_update_for_unicast(chan) {
 -                                                              onion_utils::build_first_hop_failure_packet(incoming_shared_secret, error_code, &{
 -                                                                      let mut res = Vec::with_capacity(8 + 128);
 -                                                                      // TODO: underspecified, follow https://github.com/lightningnetwork/lightning-rfc/issues/791
 -                                                                      res.extend_from_slice(&byte_utils::be16_to_array(0));
 -                                                                      res.extend_from_slice(&upd.encode_with_len()[..]);
 -                                                                      res
 -                                                              }[..])
 +                                                      let reason = if (error_code & 0x1000) != 0 {
 +                                                              if let Ok(upd) = self.get_channel_update_for_unicast(chan) {
 +                                                                      onion_utils::build_first_hop_failure_packet(incoming_shared_secret, error_code, &{
 +                                                                              let mut res = Vec::with_capacity(8 + 128);
 +                                                                              // TODO: underspecified, follow https://github.com/lightningnetwork/lightning-rfc/issues/791
 +                                                                              res.extend_from_slice(&byte_utils::be16_to_array(0));
 +                                                                              res.extend_from_slice(&upd.encode_with_len()[..]);
 +                                                                              res
 +                                                                      }[..])
 +                                                              } else {
 +                                                                      // The only case where we'd be unable to
 +                                                                      // successfully get a channel update is if the
 +                                                                      // channel isn't in the fully-funded state yet,
 +                                                                      // implying our counterparty is trying to route
 +                                                                      // payments over the channel back to themselves
 +                                                                      // (because no one else should know the short_id
 +                                                                      // is a lightning channel yet). We should have
 +                                                                      // no problem just calling this
 +                                                                      // unknown_next_peer (0x4000|10).
 +                                                                      onion_utils::build_first_hop_failure_packet(incoming_shared_secret, 0x4000|10, &[])
 +                                                              }
                                                        } else {
 -                                                              // The only case where we'd be unable to
 -                                                              // successfully get a channel update is if the
 -                                                              // channel isn't in the fully-funded state yet,
 -                                                              // implying our counterparty is trying to route
 -                                                              // payments over the channel back to themselves
 -                                                              // (cause no one else should know the short_id
 -                                                              // is a lightning channel yet). We should have
 -                                                              // no problem just calling this
 -                                                              // unknown_next_peer (0x4000|10).
 -                                                              onion_utils::build_first_hop_failure_packet(incoming_shared_secret, 0x4000|10, &[])
 +                                                              onion_utils::build_first_hop_failure_packet(incoming_shared_secret, error_code, &[])
                                                        };
                                                        let msg = msgs::UpdateFailHTLC {
                                                                channel_id: msg.channel_id,
                                                self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_update.source, &htlc_update.payment_hash, HTLCFailReason::Reason { failure_code: 0x4000 | 8, data: Vec::new() });
                                        }
                                },
 -                              MonitorEvent::CommitmentTxBroadcasted(funding_outpoint) => {
 +                              MonitorEvent::CommitmentTxConfirmed(funding_outpoint) => {
                                        let mut channel_lock = self.channel_state.lock().unwrap();
                                        let channel_state = &mut *channel_lock;
                                        let by_id = &mut channel_state.by_id;
                                                                msg: update
                                                        });
                                                }
 +                                              self.pending_events.lock().unwrap().push(events::Event::ChannelClosed { channel_id: chan.channel_id(),  reason: ClosureReason::CommitmentTxConfirmed });
                                                pending_msg_events.push(events::MessageSendEvent::HandleError {
                                                        node_id: chan.get_counterparty_node_id(),
                                                        action: msgs::ErrorAction::SendErrorMessage {
                                        Err(e) => {
                                                let (close_channel, res) = convert_chan_err!(self, e, short_to_id, chan, channel_id);
                                                handle_errors.push((chan.get_counterparty_node_id(), Err(res)));
 +                                              // ChannelClosed event is generated by handle_error for us
                                                !close_channel
                                        }
                                }
                                                                });
                                                        }
  
 +                                                      if let Ok(mut pending_events_lock) = self.pending_events.lock() {
 +                                                              pending_events_lock.push(events::Event::ChannelClosed {
 +                                                                      channel_id: *channel_id,
 +                                                                      reason: ClosureReason::CooperativeClosure
 +                                                              });
 +                                                      }
 +
                                                        log_info!(self.logger, "Broadcasting {}", log_tx!(tx));
                                                        self.tx_broadcaster.broadcast_transaction(&tx);
                                                        false
        #[cfg(any(test, feature = "fuzztarget", feature = "_test_utils"))]
        pub fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
                let events = core::cell::RefCell::new(Vec::new());
 -              let event_handler = |event| events.borrow_mut().push(event);
 +              let event_handler = |event: &events::Event| events.borrow_mut().push(event.clone());
                self.process_pending_events(&event_handler);
                events.into_inner()
        }
@@@ -4334,7 -4253,7 +4343,7 @@@ wher
                        }
  
                        for event in pending_events.drain(..) {
 -                              handler.handle_event(event);
 +                              handler.handle_event(&event);
                        }
  
                        result
@@@ -4529,7 -4448,6 +4538,7 @@@ wher
                                                        msg: update
                                                });
                                        }
 +                                      self.pending_events.lock().unwrap().push(events::Event::ChannelClosed { channel_id: channel.channel_id(),  reason: ClosureReason::CommitmentTxConfirmed });
                                        pending_msg_events.push(events::MessageSendEvent::HandleError {
                                                node_id: channel.get_counterparty_node_id(),
                                                action: msgs::ErrorAction::SendErrorMessage { msg: e },
@@@ -4720,7 -4638,6 +4729,7 @@@ impl<Signer: Sign, M: Deref , T: Deref 
                                                                msg: update
                                                        });
                                                }
 +                                              self.pending_events.lock().unwrap().push(events::Event::ChannelClosed { channel_id: chan.channel_id(),  reason: ClosureReason::DisconnectedPeer });
                                                false
                                        } else {
                                                true
                                                        if let Some(short_id) = chan.get_short_channel_id() {
                                                                short_to_id.remove(&short_id);
                                                        }
 +                                                      self.pending_events.lock().unwrap().push(events::Event::ChannelClosed { channel_id: chan.channel_id(),  reason: ClosureReason::DisconnectedPeer });
                                                        return false;
                                                } else {
                                                        no_channels_remain = false;
                                        &events::MessageSendEvent::BroadcastChannelUpdate { .. } => true,
                                        &events::MessageSendEvent::SendChannelUpdate { ref node_id, .. } => node_id != counterparty_node_id,
                                        &events::MessageSendEvent::HandleError { ref node_id, .. } => node_id != counterparty_node_id,
 -                                      &events::MessageSendEvent::PaymentFailureNetworkUpdate { .. } => true,
                                        &events::MessageSendEvent::SendChannelRangeQuery { .. } => false,
                                        &events::MessageSendEvent::SendShortIdsQuery { .. } => false,
                                        &events::MessageSendEvent::SendReplyChannelRange { .. } => false,
                        for chan in self.list_channels() {
                                if chan.counterparty.node_id == *counterparty_node_id {
                                        // Untrusted messages from peer, we throw away the error if id points to a non-existent channel
 -                                      let _ = self.force_close_channel_with_peer(&chan.channel_id, Some(counterparty_node_id));
 +                                      let _ = self.force_close_channel_with_peer(&chan.channel_id, Some(counterparty_node_id), Some(&msg.data));
                                }
                        }
                } else {
                        // Untrusted messages from peer, we throw away the error if id points to a non-existent channel
 -                      let _ = self.force_close_channel_with_peer(&msg.channel_id, Some(counterparty_node_id));
 +                      let _ = self.force_close_channel_with_peer(&msg.channel_id, Some(counterparty_node_id), Some(&msg.data));
                }
        }
  }
@@@ -4933,74 -4850,10 +4942,74 @@@ impl_writeable_tlv_based!(PendingHTLCIn
        (8, outgoing_cltv_value, required)
  });
  
 -impl_writeable_tlv_based_enum!(HTLCFailureMsg, ;
 -      (0, Relay),
 -      (1, Malformed),
 -);
 +
 +impl Writeable for HTLCFailureMsg {
 +      fn write<W: Writer>(&self, writer: &mut W) -> Result<(), io::Error> {
 +              match self {
 +                      HTLCFailureMsg::Relay(msgs::UpdateFailHTLC { channel_id, htlc_id, reason }) => {
 +                              0u8.write(writer)?;
 +                              channel_id.write(writer)?;
 +                              htlc_id.write(writer)?;
 +                              reason.write(writer)?;
 +                      },
 +                      HTLCFailureMsg::Malformed(msgs::UpdateFailMalformedHTLC {
 +                              channel_id, htlc_id, sha256_of_onion, failure_code
 +                      }) => {
 +                              1u8.write(writer)?;
 +                              channel_id.write(writer)?;
 +                              htlc_id.write(writer)?;
 +                              sha256_of_onion.write(writer)?;
 +                              failure_code.write(writer)?;
 +                      },
 +              }
 +              Ok(())
 +      }
 +}
 +
 +impl Readable for HTLCFailureMsg {
 +      fn read<R: Read>(reader: &mut R) -> Result<Self, DecodeError> {
 +              let id: u8 = Readable::read(reader)?;
 +              match id {
 +                      0 => {
 +                              Ok(HTLCFailureMsg::Relay(msgs::UpdateFailHTLC {
 +                                      channel_id: Readable::read(reader)?,
 +                                      htlc_id: Readable::read(reader)?,
 +                                      reason: Readable::read(reader)?,
 +                              }))
 +                      },
 +                      1 => {
 +                              Ok(HTLCFailureMsg::Malformed(msgs::UpdateFailMalformedHTLC {
 +                                      channel_id: Readable::read(reader)?,
 +                                      htlc_id: Readable::read(reader)?,
 +                                      sha256_of_onion: Readable::read(reader)?,
 +                                      failure_code: Readable::read(reader)?,
 +                              }))
 +                      },
 +                      // In versions prior to 0.0.101, HTLCFailureMsg objects were written with type 0 or 1 but
 +                      // weren't length-prefixed and thus didn't support reading the TLV stream suffix of the network
 +                      // messages contained in the variants.
 +                      // In version 0.0.101, support for reading the variants with these types was added, and
 +                      // we should migrate to writing these variants when UpdateFailHTLC or
 +                      // UpdateFailMalformedHTLC get TLV fields.
 +                      2 => {
 +                              let length: BigSize = Readable::read(reader)?;
 +                              let mut s = FixedLengthReader::new(reader, length.0);
 +                              let res = Readable::read(&mut s)?;
 +                              s.eat_remaining()?; // Return ShortRead if there's actually not enough bytes
 +                              Ok(HTLCFailureMsg::Relay(res))
 +                      },
 +                      3 => {
 +                              let length: BigSize = Readable::read(reader)?;
 +                              let mut s = FixedLengthReader::new(reader, length.0);
 +                              let res = Readable::read(&mut s)?;
 +                              s.eat_remaining()?; // Return ShortRead if there's actually not enough bytes
 +                              Ok(HTLCFailureMsg::Malformed(res))
 +                      },
 +                      _ => Err(DecodeError::UnknownRequiredFeature),
 +              }
 +      }
 +}
 +
  impl_writeable_tlv_based_enum!(PendingHTLCStatus, ;
        (0, Forward),
        (1, Fail),
@@@ -5071,60 -4924,14 +5080,60 @@@ impl Readable for ClaimableHTLC 
        }
  }
  
 -impl_writeable_tlv_based_enum!(HTLCSource,
 -      (0, OutboundRoute) => {
 -              (0, session_priv, required),
 -              (2, first_hop_htlc_msat, required),
 -              (4, path, vec_type),
 -      }, ;
 -      (1, PreviousHopData)
 -);
 +impl Readable for HTLCSource {
 +      fn read<R: Read>(reader: &mut R) -> Result<Self, DecodeError> {
 +              let id: u8 = Readable::read(reader)?;
 +              match id {
 +                      0 => {
 +                              let mut session_priv: ::util::ser::OptionDeserWrapper<SecretKey> = ::util::ser::OptionDeserWrapper(None);
 +                              let mut first_hop_htlc_msat: u64 = 0;
 +                              let mut path = Some(Vec::new());
 +                              let mut mpp_id = None;
 +                              read_tlv_fields!(reader, {
 +                                      (0, session_priv, required),
 +                                      (1, mpp_id, option),
 +                                      (2, first_hop_htlc_msat, required),
 +                                      (4, path, vec_type),
 +                              });
 +                              if mpp_id.is_none() {
 +                                      // For backwards compat, if there was no mpp_id written, use the session_priv bytes
 +                                      // instead.
 +                                      mpp_id = Some(MppId(*session_priv.0.unwrap().as_ref()));
 +                              }
 +                              Ok(HTLCSource::OutboundRoute {
 +                                      session_priv: session_priv.0.unwrap(),
 +                                      first_hop_htlc_msat: first_hop_htlc_msat,
 +                                      path: path.unwrap(),
 +                                      mpp_id: mpp_id.unwrap(),
 +                              })
 +                      }
 +                      1 => Ok(HTLCSource::PreviousHopData(Readable::read(reader)?)),
 +                      _ => Err(DecodeError::UnknownRequiredFeature),
 +              }
 +      }
 +}
 +
 +impl Writeable for HTLCSource {
 +      fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ::io::Error> {
 +              match self {
 +                      HTLCSource::OutboundRoute { ref session_priv, ref first_hop_htlc_msat, ref path, mpp_id } => {
 +                              0u8.write(writer)?;
 +                              let mpp_id_opt = Some(mpp_id);
 +                              write_tlv_fields!(writer, {
 +                                      (0, session_priv, required),
 +                                      (1, mpp_id_opt, option),
 +                                      (2, first_hop_htlc_msat, required),
 +                                      (4, path, vec_type),
 +                               });
 +                      }
 +                      HTLCSource::PreviousHopData(ref field) => {
 +                              1u8.write(writer)?;
 +                              field.write(writer)?;
 +                      }
 +              }
 +              Ok(())
 +      }
 +}
  
  impl_writeable_tlv_based_enum!(HTLCFailReason,
        (0, LightningError) => {
@@@ -5245,21 -5052,12 +5254,21 @@@ impl<Signer: Sign, M: Deref, T: Deref, 
                }
  
                let pending_outbound_payments = self.pending_outbound_payments.lock().unwrap();
 -              (pending_outbound_payments.len() as u64).write(writer)?;
 -              for session_priv in pending_outbound_payments.iter() {
 -                      session_priv.write(writer)?;
 +              // For backwards compat, write the session privs and their total length.
 +              let mut num_pending_outbounds_compat: u64 = 0;
 +              for (_, outbounds) in pending_outbound_payments.iter() {
 +                      num_pending_outbounds_compat += outbounds.len() as u64;
 +              }
 +              num_pending_outbounds_compat.write(writer)?;
 +              for (_, outbounds) in pending_outbound_payments.iter() {
 +                      for outbound in outbounds.iter() {
 +                              outbound.write(writer)?;
 +                      }
                }
  
 -              write_tlv_fields!(writer, {});
 +              write_tlv_fields!(writer, {
 +                      (1, pending_outbound_payments, required),
 +              });
  
                Ok(())
        }
@@@ -5396,7 -5194,6 +5405,7 @@@ impl<'a, Signer: Sign, M: Deref, T: Der
                let mut funding_txo_set = HashSet::with_capacity(cmp::min(channel_count as usize, 128));
                let mut by_id = HashMap::with_capacity(cmp::min(channel_count as usize, 128));
                let mut short_to_id = HashMap::with_capacity(cmp::min(channel_count as usize, 128));
 +              let mut channel_closures = Vec::new();
                for _ in 0..channel_count {
                        let mut channel: Channel<Signer> = Channel::read(reader, &args.keys_manager)?;
                        let funding_txo = channel.get_funding_txo().ok_or(DecodeError::InvalidValue)?;
                                        let (_, mut new_failed_htlcs) = channel.force_shutdown(true);
                                        failed_htlcs.append(&mut new_failed_htlcs);
                                        monitor.broadcast_latest_holder_commitment_txn(&args.tx_broadcaster, &args.logger);
 +                                      channel_closures.push(events::Event::ChannelClosed {
 +                                              channel_id: channel.channel_id(),
 +                                              reason: ClosureReason::OutdatedChannelManager
 +                                      });
                                } else {
                                        if let Some(short_channel_id) = channel.get_short_channel_id() {
                                                short_to_id.insert(short_channel_id, channel.channel_id());
                                None => continue,
                        }
                }
 +              if forward_htlcs_count > 0 {
 +                      // If we have pending HTLCs to forward, assume we either dropped a
 +                      // `PendingHTLCsForwardable` or the user received it but never processed it as they
 +                      // shut down before the timer hit. Either way, set the time_forwardable to a small
 +                      // constant as enough time has likely passed that we should simply handle the forwards
 +                      // now, or at least after the user gets a chance to reconnect to our peers.
 +                      pending_events_read.push(events::Event::PendingHTLCsForwardable {
 +                              time_forwardable: Duration::from_secs(2),
 +                      });
 +              }
  
                let background_event_count: u64 = Readable::read(reader)?;
                let mut pending_background_events_read: Vec<BackgroundEvent> = Vec::with_capacity(cmp::min(background_event_count as usize, MAX_ALLOC_SIZE/mem::size_of::<BackgroundEvent>()));
                        }
                }
  
 -              let pending_outbound_payments_count: u64 = Readable::read(reader)?;
 -              let mut pending_outbound_payments: HashSet<[u8; 32]> = HashSet::with_capacity(cmp::min(pending_outbound_payments_count as usize, MAX_ALLOC_SIZE/32));
 -              for _ in 0..pending_outbound_payments_count {
 -                      if !pending_outbound_payments.insert(Readable::read(reader)?) {
 -                              return Err(DecodeError::InvalidValue);
 -                      }
 +              let pending_outbound_payments_count_compat: u64 = Readable::read(reader)?;
 +              let mut pending_outbound_payments_compat: HashMap<MppId, HashSet<[u8; 32]>> =
 +                      HashMap::with_capacity(cmp::min(pending_outbound_payments_count_compat as usize, MAX_ALLOC_SIZE/32));
 +              for _ in 0..pending_outbound_payments_count_compat {
 +                      let session_priv = Readable::read(reader)?;
 +                      if pending_outbound_payments_compat.insert(MppId(session_priv), [session_priv].iter().cloned().collect()).is_some() {
 +                              return Err(DecodeError::InvalidValue)
 +                      };
                }
  
 -              read_tlv_fields!(reader, {});
 +              let mut pending_outbound_payments = None;
 +              read_tlv_fields!(reader, {
 +                      (1, pending_outbound_payments, option),
 +              });
 +              if pending_outbound_payments.is_none() {
 +                      pending_outbound_payments = Some(pending_outbound_payments_compat);
 +              }
  
                let mut secp_ctx = Secp256k1::new();
                secp_ctx.seeded_randomize(&args.keys_manager.get_secure_random_bytes());
  
 +              if !channel_closures.is_empty() {
 +                      pending_events_read.append(&mut channel_closures);
 +              }
 +
                let channel_manager = ChannelManager {
                        genesis_hash,
                        fee_estimator: args.fee_estimator,
                                pending_msg_events: Vec::new(),
                        }),
                        pending_inbound_payments: Mutex::new(pending_inbound_payments),
 -                      pending_outbound_payments: Mutex::new(pending_outbound_payments),
 +                      pending_outbound_payments: Mutex::new(pending_outbound_payments.unwrap()),
  
                        our_network_key: args.keys_manager.get_node_secret(),
                        our_network_pubkey: PublicKey::from_secret_key(&secp_ctx, &args.keys_manager.get_node_secret()),
@@@ -5606,7 -5377,7 +5615,7 @@@ mod tests 
        use bitcoin::hashes::sha256::Hash as Sha256;
        use core::time::Duration;
        use ln::{PaymentPreimage, PaymentHash, PaymentSecret};
 -      use ln::channelmanager::PaymentSendFailure;
 +      use ln::channelmanager::{MppId, PaymentSendFailure};
        use ln::features::{InitFeatures, InvoiceFeatures};
        use ln::functional_test_utils::*;
        use ln::msgs;
  
                // First, send a partial MPP payment.
                let net_graph_msg_handler = &nodes[0].net_graph_msg_handler;
 -              let route = get_route(&nodes[0].node.get_our_node_id(), &net_graph_msg_handler.network_graph.read().unwrap(), &nodes[1].node.get_our_node_id(), Some(InvoiceFeatures::known()), None, &Vec::new(), 100_000, TEST_FINAL_CLTV, &logger).unwrap();
 +              let route = get_route(&nodes[0].node.get_our_node_id(), &net_graph_msg_handler.network_graph, &nodes[1].node.get_our_node_id(), Some(InvoiceFeatures::known()), None, &Vec::new(), 100_000, TEST_FINAL_CLTV, &logger).unwrap();
                let (payment_preimage, our_payment_hash, payment_secret) = get_payment_preimage_hash!(&nodes[1]);
 +              let mpp_id = MppId([42; 32]);
                // Use the utility function send_payment_along_path to send the payment with MPP data which
                // indicates there are more HTLCs coming.
                let cur_height = CHAN_CONFIRM_DEPTH + 1; // route_payment calls send_payment, which adds 1 to the current height. So we do the same here to match.
 -              nodes[0].node.send_payment_along_path(&route.paths[0], &our_payment_hash, &Some(payment_secret), 200_000, cur_height, &None).unwrap();
 +              nodes[0].node.send_payment_along_path(&route.paths[0], &our_payment_hash, &Some(payment_secret), 200_000, cur_height, mpp_id, &None).unwrap();
                check_added_monitors!(nodes[0], 1);
                let mut events = nodes[0].node.get_and_clear_pending_msg_events();
                assert_eq!(events.len(), 1);
                expect_payment_failed!(nodes[0], our_payment_hash, true);
  
                // Send the second half of the original MPP payment.
 -              nodes[0].node.send_payment_along_path(&route.paths[0], &our_payment_hash, &Some(payment_secret), 200_000, cur_height, &None).unwrap();
 +              nodes[0].node.send_payment_along_path(&route.paths[0], &our_payment_hash, &Some(payment_secret), 200_000, cur_height, mpp_id, &None).unwrap();
                check_added_monitors!(nodes[0], 1);
                let mut events = nodes[0].node.get_and_clear_pending_msg_events();
                assert_eq!(events.len(), 1);
                nodes[0].node.handle_revoke_and_ack(&nodes[1].node.get_our_node_id(), &bs_third_raa);
                check_added_monitors!(nodes[0], 1);
  
 -              // There's an existing bug that generates a PaymentSent event for each MPP path, so handle that here.
 +              // Note that successful MPP payments will generate 1 event upon the first path's success. No
 +              // further events will be generated for subsequence path successes.
                let events = nodes[0].node.get_and_clear_pending_events();
                match events[0] {
                        Event::PaymentSent { payment_preimage: ref preimage } => {
                        },
                        _ => panic!("Unexpected event"),
                }
 -              match events[1] {
 -                      Event::PaymentSent { payment_preimage: ref preimage } => {
 -                              assert_eq!(payment_preimage, *preimage);
 -                      },
 -                      _ => panic!("Unexpected event"),
 -              }
        }
  
        #[test]
                let (payment_preimage, payment_hash, _) = route_payment(&nodes[0], &expected_route, 100_000);
  
                // Next, attempt a keysend payment and make sure it fails.
 -              let route = get_route(&nodes[0].node.get_our_node_id(), &nodes[0].net_graph_msg_handler.network_graph.read().unwrap(), &expected_route.last().unwrap().node.get_our_node_id(), Some(InvoiceFeatures::known()), None, &Vec::new(), 100_000, TEST_FINAL_CLTV, &logger).unwrap();
 +              let route = get_route(&nodes[0].node.get_our_node_id(), &nodes[0].net_graph_msg_handler.network_graph, &expected_route.last().unwrap().node.get_our_node_id(), Some(InvoiceFeatures::known()), None, &Vec::new(), 100_000, TEST_FINAL_CLTV, &logger).unwrap();
                nodes[0].node.send_spontaneous_payment(&route, Some(payment_preimage)).unwrap();
                check_added_monitors!(nodes[0], 1);
                let mut events = nodes[0].node.get_and_clear_pending_msg_events();
  
                // To start (2), send a keysend payment but don't claim it.
                let payment_preimage = PaymentPreimage([42; 32]);
 -              let route = get_route(&nodes[0].node.get_our_node_id(), &nodes[0].net_graph_msg_handler.network_graph.read().unwrap(), &expected_route.last().unwrap().node.get_our_node_id(), Some(InvoiceFeatures::known()), None, &Vec::new(), 100_000, TEST_FINAL_CLTV, &logger).unwrap();
 +              let route = get_route(&nodes[0].node.get_our_node_id(), &nodes[0].net_graph_msg_handler.network_graph, &expected_route.last().unwrap().node.get_our_node_id(), Some(InvoiceFeatures::known()), None, &Vec::new(), 100_000, TEST_FINAL_CLTV, &logger).unwrap();
                let payment_hash = nodes[0].node.send_spontaneous_payment(&route, Some(payment_preimage)).unwrap();
                check_added_monitors!(nodes[0], 1);
                let mut events = nodes[0].node.get_and_clear_pending_msg_events();
                nodes[1].node.peer_connected(&payer_pubkey, &msgs::Init { features: InitFeatures::known() });
  
                let _chan = create_chan_between_nodes(&nodes[0], &nodes[1], InitFeatures::known(), InitFeatures::known());
 -              let network_graph = nodes[0].net_graph_msg_handler.network_graph.read().unwrap();
 +              let network_graph = &nodes[0].net_graph_msg_handler.network_graph;
                let first_hops = nodes[0].node.list_usable_channels();
 -              let route = get_keysend_route(&payer_pubkey, &network_graph, &payee_pubkey,
 +              let route = get_keysend_route(&payer_pubkey, network_graph, &payee_pubkey,
                                    Some(&first_hops.iter().collect::<Vec<_>>()), &vec![], 10000, 40,
                                    nodes[0].logger).unwrap();
  
                nodes[1].node.peer_connected(&payer_pubkey, &msgs::Init { features: InitFeatures::known() });
  
                let _chan = create_chan_between_nodes(&nodes[0], &nodes[1], InitFeatures::known(), InitFeatures::known());
 -              let network_graph = nodes[0].net_graph_msg_handler.network_graph.read().unwrap();
 +              let network_graph = &nodes[0].net_graph_msg_handler.network_graph;
                let first_hops = nodes[0].node.list_usable_channels();
 -              let route = get_keysend_route(&payer_pubkey, &network_graph, &payee_pubkey,
 +              let route = get_keysend_route(&payer_pubkey, network_graph, &payee_pubkey,
                                    Some(&first_hops.iter().collect::<Vec<_>>()), &vec![], 10000, 40,
                                    nodes[0].logger).unwrap();
  
                // Marshall an MPP route.
                let (_, payment_hash, _) = get_payment_preimage_hash!(&nodes[3]);
                let net_graph_msg_handler = &nodes[0].net_graph_msg_handler;
 -              let mut route = get_route(&nodes[0].node.get_our_node_id(), &net_graph_msg_handler.network_graph.read().unwrap(), &nodes[3].node.get_our_node_id(), Some(InvoiceFeatures::known()), None, &[], 100000, TEST_FINAL_CLTV, &logger).unwrap();
 +              let mut route = get_route(&nodes[0].node.get_our_node_id(), &net_graph_msg_handler.network_graph, &nodes[3].node.get_our_node_id(), Some(InvoiceFeatures::known()), None, &[], 100000, TEST_FINAL_CLTV, &logger).unwrap();
                let path = route.paths[0].clone();
                route.paths.push(path);
                route.paths[0][0].pubkey = nodes[1].node.get_our_node_id();