Merge pull request #1413 from ViktorTigerstrom/2022-04-default-to-bolt4-tlv-onions
[rust-lightning] / lightning / src / ln / channelmanager.rs
index 570f04b628a7a5590d93de0b37edc43b5a5af32f..ae8625d89f4f60eaba2fe8127a5b957a75b5d301 100644 (file)
@@ -53,7 +53,7 @@ use crate::ln::msgs::{ChannelMessageHandler, DecodeError, LightningError, MAX_VA
 use crate::ln::wire::Encode;
 use crate::chain::keysinterface::{Sign, KeysInterface, KeysManager, Recipient};
 use crate::util::config::{UserConfig, ChannelConfig};
-use crate::util::events::{EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider, ClosureReason, HTLCDestination};
+use crate::util::events::{Event, EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider, ClosureReason, HTLCDestination};
 use crate::util::{byte_utils, events};
 use crate::util::wakers::{Future, Notifier};
 use crate::util::scid_utils::fake_scid;
@@ -112,7 +112,8 @@ pub(super) struct PendingHTLCInfo {
        pub(super) routing: PendingHTLCRouting,
        pub(super) incoming_shared_secret: [u8; 32],
        payment_hash: PaymentHash,
-       pub(super) amt_to_forward: u64,
+       pub(super) incoming_amt_msat: Option<u64>, // Added in 0.0.113
+       pub(super) outgoing_amt_msat: u64,
        pub(super) outgoing_cltv_value: u32,
 }
 
@@ -129,20 +130,22 @@ pub(super) enum PendingHTLCStatus {
        Fail(HTLCFailureMsg),
 }
 
-pub(super) enum HTLCForwardInfo {
-       AddHTLC {
-               forward_info: PendingHTLCInfo,
+pub(super) struct PendingAddHTLCInfo {
+       pub(super) forward_info: PendingHTLCInfo,
 
-               // These fields are produced in `forward_htlcs()` and consumed in
-               // `process_pending_htlc_forwards()` for constructing the
-               // `HTLCSource::PreviousHopData` for failed and forwarded
-               // HTLCs.
-               //
-               // Note that this may be an outbound SCID alias for the associated channel.
-               prev_short_channel_id: u64,
-               prev_htlc_id: u64,
-               prev_funding_outpoint: OutPoint,
-       },
+       // These fields are produced in `forward_htlcs()` and consumed in
+       // `process_pending_htlc_forwards()` for constructing the
+       // `HTLCSource::PreviousHopData` for failed and forwarded
+       // HTLCs.
+       //
+       // Note that this may be an outbound SCID alias for the associated channel.
+       prev_short_channel_id: u64,
+       prev_htlc_id: u64,
+       prev_funding_outpoint: OutPoint,
+}
+
+pub(super) enum HTLCForwardInfo {
+       AddHTLC(PendingAddHTLCInfo),
        FailHTLC {
                htlc_id: u64,
                err_packet: msgs::OnionErrorPacket,
@@ -1348,7 +1351,7 @@ macro_rules! convert_chan_err {
 }
 
 macro_rules! break_chan_entry {
-       ($self: ident, $res: expr, $channel_state: expr, $entry: expr) => {
+       ($self: ident, $res: expr, $entry: expr) => {
                match $res {
                        Ok(res) => res,
                        Err(e) => {
@@ -1363,7 +1366,7 @@ macro_rules! break_chan_entry {
 }
 
 macro_rules! try_chan_entry {
-       ($self: ident, $res: expr, $channel_state: expr, $entry: expr) => {
+       ($self: ident, $res: expr, $entry: expr) => {
                match $res {
                        Ok(res) => res,
                        Err(e) => {
@@ -1378,7 +1381,7 @@ macro_rules! try_chan_entry {
 }
 
 macro_rules! remove_channel {
-       ($self: expr, $channel_state: expr, $entry: expr) => {
+       ($self: expr, $entry: expr) => {
                {
                        let channel = $entry.remove_entry().1;
                        update_maps_on_chan_removal!($self, channel);
@@ -1919,7 +1922,7 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
                                                let (result, is_permanent) =
                                                        handle_monitor_update_res!(self, update_res, chan_entry.get_mut(), RAACommitmentOrder::CommitmentFirst, chan_entry.key(), NO_UPDATE);
                                                if is_permanent {
-                                                       remove_channel!(self, channel_state, chan_entry);
+                                                       remove_channel!(self, chan_entry);
                                                        break result;
                                                }
                                        }
@@ -1930,7 +1933,7 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
                                        });
 
                                        if chan_entry.get().is_shutdown() {
-                                               let channel = remove_channel!(self, channel_state, chan_entry);
+                                               let channel = remove_channel!(self, chan_entry);
                                                if let Ok(channel_update) = self.get_channel_update_for_broadcast(&channel) {
                                                        channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
                                                                msg: channel_update
@@ -2031,7 +2034,7 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
                                } else {
                                        self.issue_channel_close_events(chan.get(),ClosureReason::HolderForceClosed);
                                }
-                               remove_channel!(self, channel_state, chan)
+                               remove_channel!(self, chan)
                        } else {
                                return Err(APIError::ChannelUnavailable{err: "No such channel".to_owned()});
                        }
@@ -2135,13 +2138,6 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
                }
 
                let routing = match hop_data.format {
-                       msgs::OnionHopDataFormat::Legacy { .. } => {
-                               return Err(ReceiveError {
-                                       err_code: 0x4000|0x2000|3,
-                                       err_data: Vec::new(),
-                                       msg: "We require payment_secrets",
-                               });
-                       },
                        msgs::OnionHopDataFormat::NonFinalNode { .. } => {
                                return Err(ReceiveError {
                                        err_code: 0x4000|22,
@@ -2194,7 +2190,8 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
                        routing,
                        payment_hash,
                        incoming_shared_secret: shared_secret,
-                       amt_to_forward: amt_msat,
+                       incoming_amt_msat: Some(amt_msat),
+                       outgoing_amt_msat: amt_msat,
                        outgoing_cltv_value: hop_data.outgoing_cltv_value,
                })
        }
@@ -2276,7 +2273,6 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
                                };
 
                                let short_channel_id = match next_hop_data.format {
-                                       msgs::OnionHopDataFormat::Legacy { short_channel_id } => short_channel_id,
                                        msgs::OnionHopDataFormat::NonFinalNode { short_channel_id } => short_channel_id,
                                        msgs::OnionHopDataFormat::FinalNode { .. } => {
                                                return_err!("Final Node OnionHopData provided for us as an intermediary node", 0x4000 | 22, &[0;0]);
@@ -2290,13 +2286,14 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
                                        },
                                        payment_hash: msg.payment_hash.clone(),
                                        incoming_shared_secret: shared_secret,
-                                       amt_to_forward: next_hop_data.amt_to_forward,
+                                       incoming_amt_msat: Some(msg.amount_msat),
+                                       outgoing_amt_msat: next_hop_data.amt_to_forward,
                                        outgoing_cltv_value: next_hop_data.outgoing_cltv_value,
                                })
                        }
                };
 
-               if let &PendingHTLCStatus::Forward(PendingHTLCInfo { ref routing, ref amt_to_forward, ref outgoing_cltv_value, .. }) = &pending_forward_info {
+               if let &PendingHTLCStatus::Forward(PendingHTLCInfo { ref routing, ref outgoing_amt_msat, ref outgoing_cltv_value, .. }) = &pending_forward_info {
                        // If short_channel_id is 0 here, we'll reject the HTLC as there cannot be a channel
                        // with a short_channel_id of 0. This is important as various things later assume
                        // short_channel_id is non-0 in any ::Forward.
@@ -2308,7 +2305,7 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
                                                None => { // unknown_next_peer
                                                        // Note that this is likely a timing oracle for detecting whether an scid is a
                                                        // phantom.
-                                                       if fake_scid::is_valid_phantom(&self.fake_scid_rand_bytes, *short_channel_id) {
+                                                       if fake_scid::is_valid_phantom(&self.fake_scid_rand_bytes, *short_channel_id, &self.genesis_hash) {
                                                                None
                                                        } else {
                                                                break Some(("Don't have available channel for forwarding as requested.", 0x4000 | 10, None));
@@ -2317,7 +2314,14 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
                                                Some((_cp_id, chan_id)) => Some(chan_id.clone()),
                                        };
                                        let chan_update_opt = if let Some(forwarding_id) = forwarding_id_opt {
-                                               let chan = channel_state.by_id.get_mut(&forwarding_id).unwrap();
+                                               let chan = match channel_state.by_id.get_mut(&forwarding_id) {
+                                                       None => {
+                                                               // Channel was removed. The short_to_chan_info and by_id maps have
+                                                               // no consistency guarantees.
+                                                               break Some(("Don't have available channel for forwarding as requested.", 0x4000 | 10, None));
+                                                       },
+                                                       Some(chan) => chan
+                                               };
                                                if !chan.should_announce() && !self.default_configuration.accept_forwards_to_priv_channels {
                                                        // Note that the behavior here should be identical to the above block - we
                                                        // should NOT reveal the existence or non-existence of a private channel if
@@ -2340,10 +2344,10 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
                                                if !chan.is_live() { // channel_disabled
                                                        break Some(("Forwarding channel is not in a ready state.", 0x1000 | 20, chan_update_opt));
                                                }
-                                               if *amt_to_forward < chan.get_counterparty_htlc_minimum_msat() { // amount_below_minimum
+                                               if *outgoing_amt_msat < chan.get_counterparty_htlc_minimum_msat() { // amount_below_minimum
                                                        break Some(("HTLC amount was below the htlc_minimum_msat", 0x1000 | 11, chan_update_opt));
                                                }
-                                               if let Err((err, code)) = chan.htlc_satisfies_config(&msg, *amt_to_forward, *outgoing_cltv_value) {
+                                               if let Err((err, code)) = chan.htlc_satisfies_config(&msg, *outgoing_amt_msat, *outgoing_cltv_value) {
                                                        break Some((err, code, chan_update_opt));
                                                }
                                                chan_update_opt
@@ -2506,7 +2510,7 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
                                                        payment_secret: payment_secret.clone(),
                                                        payment_params: payment_params.clone(),
                                                }, onion_packet, &self.logger),
-                                       channel_state, chan)
+                                               chan)
                                } {
                                        Some((update_add, commitment_signed, monitor_update)) => {
                                                let update_err = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), monitor_update);
@@ -2544,7 +2548,12 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
                                        },
                                        None => { },
                                }
-                       } else { unreachable!(); }
+                       } else {
+                               // The channel was likely removed after we fetched the id from the
+                               // `short_to_chan_info` map, but before we successfully locked the `by_id` map.
+                               // This can occur as no consistency guarantees exists between the two maps.
+                               return Err(APIError::ChannelUnavailable{err: "No channel available with first hop!".to_owned()});
+                       }
                        return Ok(());
                };
 
@@ -3133,87 +3142,90 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
                                let mut channel_state_lock = self.channel_state.lock().unwrap();
                                let channel_state = &mut *channel_state_lock;
                                if short_chan_id != 0 {
-                                       let forward_chan_id = match self.short_to_chan_info.read().unwrap().get(&short_chan_id) {
-                                               Some((_cp_id, chan_id)) => chan_id.clone(),
-                                               None => {
+                                       macro_rules! forwarding_channel_not_found {
+                                               () => {
                                                        for forward_info in pending_forwards.drain(..) {
                                                                match forward_info {
-                                                                       HTLCForwardInfo::AddHTLC { prev_short_channel_id, prev_htlc_id, forward_info: PendingHTLCInfo {
-                                                                               routing, incoming_shared_secret, payment_hash, amt_to_forward, outgoing_cltv_value },
-                                                                               prev_funding_outpoint } => {
-                                                                                       macro_rules! failure_handler {
-                                                                                               ($msg: expr, $err_code: expr, $err_data: expr, $phantom_ss: expr, $next_hop_unknown: expr) => {
-                                                                                                       log_info!(self.logger, "Failed to accept/forward incoming HTLC: {}", $msg);
-
-                                                                                                       let htlc_source = HTLCSource::PreviousHopData(HTLCPreviousHopData {
-                                                                                                               short_channel_id: prev_short_channel_id,
-                                                                                                               outpoint: prev_funding_outpoint,
-                                                                                                               htlc_id: prev_htlc_id,
-                                                                                                               incoming_packet_shared_secret: incoming_shared_secret,
-                                                                                                               phantom_shared_secret: $phantom_ss,
-                                                                                                       });
-
-                                                                                                       let reason = if $next_hop_unknown {
-                                                                                                               HTLCDestination::UnknownNextHop { requested_forward_scid: short_chan_id }
-                                                                                                       } else {
-                                                                                                               HTLCDestination::FailedPayment{ payment_hash }
-                                                                                                       };
-
-                                                                                                       failed_forwards.push((htlc_source, payment_hash,
-                                                                                                               HTLCFailReason::Reason { failure_code: $err_code, data: $err_data },
-                                                                                                               reason
-                                                                                                       ));
-                                                                                                       continue;
-                                                                                               }
+                                                                       HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo {
+                                                                               prev_short_channel_id, prev_htlc_id, prev_funding_outpoint,
+                                                                               forward_info: PendingHTLCInfo {
+                                                                                       routing, incoming_shared_secret, payment_hash, outgoing_amt_msat,
+                                                                                       outgoing_cltv_value, incoming_amt_msat: _
+                                                                               }
+                                                                       }) => {
+                                                                               macro_rules! failure_handler {
+                                                                                       ($msg: expr, $err_code: expr, $err_data: expr, $phantom_ss: expr, $next_hop_unknown: expr) => {
+                                                                                               log_info!(self.logger, "Failed to accept/forward incoming HTLC: {}", $msg);
+
+                                                                                               let htlc_source = HTLCSource::PreviousHopData(HTLCPreviousHopData {
+                                                                                                       short_channel_id: prev_short_channel_id,
+                                                                                                       outpoint: prev_funding_outpoint,
+                                                                                                       htlc_id: prev_htlc_id,
+                                                                                                       incoming_packet_shared_secret: incoming_shared_secret,
+                                                                                                       phantom_shared_secret: $phantom_ss,
+                                                                                               });
+
+                                                                                               let reason = if $next_hop_unknown {
+                                                                                                       HTLCDestination::UnknownNextHop { requested_forward_scid: short_chan_id }
+                                                                                               } else {
+                                                                                                       HTLCDestination::FailedPayment{ payment_hash }
+                                                                                               };
+
+                                                                                               failed_forwards.push((htlc_source, payment_hash,
+                                                                                                       HTLCFailReason::Reason { failure_code: $err_code, data: $err_data },
+                                                                                                       reason
+                                                                                               ));
+                                                                                               continue;
                                                                                        }
-                                                                                       macro_rules! fail_forward {
-                                                                                               ($msg: expr, $err_code: expr, $err_data: expr, $phantom_ss: expr) => {
-                                                                                                       {
-                                                                                                               failure_handler!($msg, $err_code, $err_data, $phantom_ss, true);
-                                                                                                       }
+                                                                               }
+                                                                               macro_rules! fail_forward {
+                                                                                       ($msg: expr, $err_code: expr, $err_data: expr, $phantom_ss: expr) => {
+                                                                                               {
+                                                                                                       failure_handler!($msg, $err_code, $err_data, $phantom_ss, true);
                                                                                                }
                                                                                        }
-                                                                                       macro_rules! failed_payment {
-                                                                                               ($msg: expr, $err_code: expr, $err_data: expr, $phantom_ss: expr) => {
-                                                                                                       {
-                                                                                                               failure_handler!($msg, $err_code, $err_data, $phantom_ss, false);
-                                                                                                       }
+                                                                               }
+                                                                               macro_rules! failed_payment {
+                                                                                       ($msg: expr, $err_code: expr, $err_data: expr, $phantom_ss: expr) => {
+                                                                                               {
+                                                                                                       failure_handler!($msg, $err_code, $err_data, $phantom_ss, false);
                                                                                                }
                                                                                        }
-                                                                                       if let PendingHTLCRouting::Forward { onion_packet, .. } = routing {
-                                                                                               let phantom_secret_res = self.keys_manager.get_node_secret(Recipient::PhantomNode);
-                                                                                               if phantom_secret_res.is_ok() && fake_scid::is_valid_phantom(&self.fake_scid_rand_bytes, short_chan_id) {
-                                                                                                       let phantom_shared_secret = SharedSecret::new(&onion_packet.public_key.unwrap(), &phantom_secret_res.unwrap()).secret_bytes();
-                                                                                                       let next_hop = match onion_utils::decode_next_payment_hop(phantom_shared_secret, &onion_packet.hop_data, onion_packet.hmac, payment_hash) {
-                                                                                                               Ok(res) => res,
-                                                                                                               Err(onion_utils::OnionDecodeErr::Malformed { err_msg, err_code }) => {
-                                                                                                                       let sha256_of_onion = Sha256::hash(&onion_packet.hop_data).into_inner();
-                                                                                                                       // In this scenario, the phantom would have sent us an
-                                                                                                                       // `update_fail_malformed_htlc`, meaning here we encrypt the error as
-                                                                                                                       // if it came from us (the second-to-last hop) but contains the sha256
-                                                                                                                       // of the onion.
-                                                                                                                       failed_payment!(err_msg, err_code, sha256_of_onion.to_vec(), None);
-                                                                                                               },
-                                                                                                               Err(onion_utils::OnionDecodeErr::Relay { err_msg, err_code }) => {
-                                                                                                                       failed_payment!(err_msg, err_code, Vec::new(), Some(phantom_shared_secret));
-                                                                                                               },
-                                                                                                       };
-                                                                                                       match next_hop {
-                                                                                                               onion_utils::Hop::Receive(hop_data) => {
-                                                                                                                       match self.construct_recv_pending_htlc_info(hop_data, incoming_shared_secret, payment_hash, amt_to_forward, outgoing_cltv_value, Some(phantom_shared_secret)) {
-                                                                                                                               Ok(info) => phantom_receives.push((prev_short_channel_id, prev_funding_outpoint, vec![(info, prev_htlc_id)])),
-                                                                                                                               Err(ReceiveError { err_code, err_data, msg }) => failed_payment!(msg, err_code, err_data, Some(phantom_shared_secret))
-                                                                                                                       }
-                                                                                                               },
-                                                                                                               _ => panic!(),
-                                                                                                       }
-                                                                                               } else {
-                                                                                                       fail_forward!(format!("Unknown short channel id {} for forward HTLC", short_chan_id), 0x4000 | 10, Vec::new(), None);
+                                                                               }
+                                                                               if let PendingHTLCRouting::Forward { onion_packet, .. } = routing {
+                                                                                       let phantom_secret_res = self.keys_manager.get_node_secret(Recipient::PhantomNode);
+                                                                                       if phantom_secret_res.is_ok() && fake_scid::is_valid_phantom(&self.fake_scid_rand_bytes, short_chan_id, &self.genesis_hash) {
+                                                                                               let phantom_shared_secret = SharedSecret::new(&onion_packet.public_key.unwrap(), &phantom_secret_res.unwrap()).secret_bytes();
+                                                                                               let next_hop = match onion_utils::decode_next_payment_hop(phantom_shared_secret, &onion_packet.hop_data, onion_packet.hmac, payment_hash) {
+                                                                                                       Ok(res) => res,
+                                                                                                       Err(onion_utils::OnionDecodeErr::Malformed { err_msg, err_code }) => {
+                                                                                                               let sha256_of_onion = Sha256::hash(&onion_packet.hop_data).into_inner();
+                                                                                                               // In this scenario, the phantom would have sent us an
+                                                                                                               // `update_fail_malformed_htlc`, meaning here we encrypt the error as
+                                                                                                               // if it came from us (the second-to-last hop) but contains the sha256
+                                                                                                               // of the onion.
+                                                                                                               failed_payment!(err_msg, err_code, sha256_of_onion.to_vec(), None);
+                                                                                                       },
+                                                                                                       Err(onion_utils::OnionDecodeErr::Relay { err_msg, err_code }) => {
+                                                                                                               failed_payment!(err_msg, err_code, Vec::new(), Some(phantom_shared_secret));
+                                                                                                       },
+                                                                                               };
+                                                                                               match next_hop {
+                                                                                                       onion_utils::Hop::Receive(hop_data) => {
+                                                                                                               match self.construct_recv_pending_htlc_info(hop_data, incoming_shared_secret, payment_hash, outgoing_amt_msat, outgoing_cltv_value, Some(phantom_shared_secret)) {
+                                                                                                                       Ok(info) => phantom_receives.push((prev_short_channel_id, prev_funding_outpoint, vec![(info, prev_htlc_id)])),
+                                                                                                                       Err(ReceiveError { err_code, err_data, msg }) => failed_payment!(msg, err_code, err_data, Some(phantom_shared_secret))
+                                                                                                               }
+                                                                                                       },
+                                                                                                       _ => panic!(),
                                                                                                }
                                                                                        } else {
                                                                                                fail_forward!(format!("Unknown short channel id {} for forward HTLC", short_chan_id), 0x4000 | 10, Vec::new(), None);
                                                                                        }
-                                                                               },
+                                                                               } else {
+                                                                                       fail_forward!(format!("Unknown short channel id {} for forward HTLC", short_chan_id), 0x4000 | 10, Vec::new(), None);
+                                                                               }
+                                                                       },
                                                                        HTLCForwardInfo::FailHTLC { .. } => {
                                                                                // Channel went away before we could fail it. This implies
                                                                                // the channel is now on chain and our counterparty is
@@ -3222,143 +3234,158 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
                                                                        }
                                                                }
                                                        }
+                                               }
+                                       }
+                                       let forward_chan_id = match self.short_to_chan_info.read().unwrap().get(&short_chan_id) {
+                                               Some((_cp_id, chan_id)) => chan_id.clone(),
+                                               None => {
+                                                       forwarding_channel_not_found!();
                                                        continue;
                                                }
                                        };
-                                       if let hash_map::Entry::Occupied(mut chan) = channel_state.by_id.entry(forward_chan_id) {
-                                               let mut add_htlc_msgs = Vec::new();
-                                               let mut fail_htlc_msgs = Vec::new();
-                                               for forward_info in pending_forwards.drain(..) {
-                                                       match forward_info {
-                                                               HTLCForwardInfo::AddHTLC { prev_short_channel_id, prev_htlc_id, forward_info: PendingHTLCInfo {
-                                                                               routing: PendingHTLCRouting::Forward {
-                                                                                       onion_packet, ..
-                                                                               }, incoming_shared_secret, payment_hash, amt_to_forward, outgoing_cltv_value },
-                                                                               prev_funding_outpoint } => {
-                                                                       log_trace!(self.logger, "Adding HTLC from short id {} with payment_hash {} to channel with short id {} after delay", prev_short_channel_id, log_bytes!(payment_hash.0), short_chan_id);
-                                                                       let htlc_source = HTLCSource::PreviousHopData(HTLCPreviousHopData {
-                                                                               short_channel_id: prev_short_channel_id,
-                                                                               outpoint: prev_funding_outpoint,
-                                                                               htlc_id: prev_htlc_id,
-                                                                               incoming_packet_shared_secret: incoming_shared_secret,
-                                                                               // Phantom payments are only PendingHTLCRouting::Receive.
-                                                                               phantom_shared_secret: None,
-                                                                       });
-                                                                       match chan.get_mut().send_htlc(amt_to_forward, payment_hash, outgoing_cltv_value, htlc_source.clone(), onion_packet, &self.logger) {
-                                                                               Err(e) => {
-                                                                                       if let ChannelError::Ignore(msg) = e {
-                                                                                               log_trace!(self.logger, "Failed to forward HTLC with payment_hash {}: {}", log_bytes!(payment_hash.0), msg);
-                                                                                       } else {
-                                                                                               panic!("Stated return value requirements in send_htlc() were not met");
-                                                                                       }
-                                                                                       let (failure_code, data) = self.get_htlc_temp_fail_err_and_data(0x1000|7, short_chan_id, chan.get());
-                                                                                       failed_forwards.push((htlc_source, payment_hash,
-                                                                                               HTLCFailReason::Reason { failure_code, data },
-                                                                                               HTLCDestination::NextHopChannel { node_id: Some(chan.get().get_counterparty_node_id()), channel_id: forward_chan_id }
-                                                                                       ));
-                                                                                       continue;
+                                       match channel_state.by_id.entry(forward_chan_id) {
+                                               hash_map::Entry::Vacant(_) => {
+                                                       forwarding_channel_not_found!();
+                                                       continue;
+                                               },
+                                               hash_map::Entry::Occupied(mut chan) => {
+                                                       let mut add_htlc_msgs = Vec::new();
+                                                       let mut fail_htlc_msgs = Vec::new();
+                                                       for forward_info in pending_forwards.drain(..) {
+                                                               match forward_info {
+                                                                       HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo {
+                                                                               prev_short_channel_id, prev_htlc_id, prev_funding_outpoint ,
+                                                                               forward_info: PendingHTLCInfo {
+                                                                                       incoming_shared_secret, payment_hash, outgoing_amt_msat, outgoing_cltv_value,
+                                                                                       routing: PendingHTLCRouting::Forward { onion_packet, .. }, incoming_amt_msat: _,
                                                                                },
-                                                                               Ok(update_add) => {
-                                                                                       match update_add {
-                                                                                               Some(msg) => { add_htlc_msgs.push(msg); },
-                                                                                               None => {
-                                                                                                       // Nothing to do here...we're waiting on a remote
-                                                                                                       // revoke_and_ack before we can add anymore HTLCs. The Channel
-                                                                                                       // will automatically handle building the update_add_htlc and
-                                                                                                       // commitment_signed messages when we can.
-                                                                                                       // TODO: Do some kind of timer to set the channel as !is_live()
-                                                                                                       // as we don't really want others relying on us relaying through
-                                                                                                       // this channel currently :/.
+                                                                       }) => {
+                                                                               log_trace!(self.logger, "Adding HTLC from short id {} with payment_hash {} to channel with short id {} after delay", prev_short_channel_id, log_bytes!(payment_hash.0), short_chan_id);
+                                                                               let htlc_source = HTLCSource::PreviousHopData(HTLCPreviousHopData {
+                                                                                       short_channel_id: prev_short_channel_id,
+                                                                                       outpoint: prev_funding_outpoint,
+                                                                                       htlc_id: prev_htlc_id,
+                                                                                       incoming_packet_shared_secret: incoming_shared_secret,
+                                                                                       // Phantom payments are only PendingHTLCRouting::Receive.
+                                                                                       phantom_shared_secret: None,
+                                                                               });
+                                                                               match chan.get_mut().send_htlc(outgoing_amt_msat, payment_hash, outgoing_cltv_value, htlc_source.clone(), onion_packet, &self.logger) {
+                                                                                       Err(e) => {
+                                                                                               if let ChannelError::Ignore(msg) = e {
+                                                                                                       log_trace!(self.logger, "Failed to forward HTLC with payment_hash {}: {}", log_bytes!(payment_hash.0), msg);
+                                                                                               } else {
+                                                                                                       panic!("Stated return value requirements in send_htlc() were not met");
+                                                                                               }
+                                                                                               let (failure_code, data) = self.get_htlc_temp_fail_err_and_data(0x1000|7, short_chan_id, chan.get());
+                                                                                               failed_forwards.push((htlc_source, payment_hash,
+                                                                                                       HTLCFailReason::Reason { failure_code, data },
+                                                                                                       HTLCDestination::NextHopChannel { node_id: Some(chan.get().get_counterparty_node_id()), channel_id: forward_chan_id }
+                                                                                               ));
+                                                                                               continue;
+                                                                                       },
+                                                                                       Ok(update_add) => {
+                                                                                               match update_add {
+                                                                                                       Some(msg) => { add_htlc_msgs.push(msg); },
+                                                                                                       None => {
+                                                                                                               // Nothing to do here...we're waiting on a remote
+                                                                                                               // revoke_and_ack before we can add anymore HTLCs. The Channel
+                                                                                                               // will automatically handle building the update_add_htlc and
+                                                                                                               // commitment_signed messages when we can.
+                                                                                                               // TODO: Do some kind of timer to set the channel as !is_live()
+                                                                                                               // as we don't really want others relying on us relaying through
+                                                                                                               // this channel currently :/.
+                                                                                                       }
                                                                                                }
                                                                                        }
                                                                                }
-                                                                       }
-                                                               },
-                                                               HTLCForwardInfo::AddHTLC { .. } => {
-                                                                       panic!("short_channel_id != 0 should imply any pending_forward entries are of type Forward");
-                                                               },
-                                                               HTLCForwardInfo::FailHTLC { htlc_id, err_packet } => {
-                                                                       log_trace!(self.logger, "Failing HTLC back to channel with short id {} (backward HTLC ID {}) after delay", short_chan_id, htlc_id);
-                                                                       match chan.get_mut().get_update_fail_htlc(htlc_id, err_packet, &self.logger) {
-                                                                               Err(e) => {
-                                                                                       if let ChannelError::Ignore(msg) = e {
-                                                                                               log_trace!(self.logger, "Failed to fail HTLC with ID {} backwards to short_id {}: {}", htlc_id, short_chan_id, msg);
-                                                                                       } else {
-                                                                                               panic!("Stated return value requirements in get_update_fail_htlc() were not met");
+                                                                       },
+                                                                       HTLCForwardInfo::AddHTLC { .. } => {
+                                                                               panic!("short_channel_id != 0 should imply any pending_forward entries are of type Forward");
+                                                                       },
+                                                                       HTLCForwardInfo::FailHTLC { htlc_id, err_packet } => {
+                                                                               log_trace!(self.logger, "Failing HTLC back to channel with short id {} (backward HTLC ID {}) after delay", short_chan_id, htlc_id);
+                                                                               match chan.get_mut().get_update_fail_htlc(htlc_id, err_packet, &self.logger) {
+                                                                                       Err(e) => {
+                                                                                               if let ChannelError::Ignore(msg) = e {
+                                                                                                       log_trace!(self.logger, "Failed to fail HTLC with ID {} backwards to short_id {}: {}", htlc_id, short_chan_id, msg);
+                                                                                               } else {
+                                                                                                       panic!("Stated return value requirements in get_update_fail_htlc() were not met");
+                                                                                               }
+                                                                                               // fail-backs are best-effort, we probably already have one
+                                                                                               // pending, and if not that's OK, if not, the channel is on
+                                                                                               // the chain and sending the HTLC-Timeout is their problem.
+                                                                                               continue;
+                                                                                       },
+                                                                                       Ok(Some(msg)) => { fail_htlc_msgs.push(msg); },
+                                                                                       Ok(None) => {
+                                                                                               // Nothing to do here...we're waiting on a remote
+                                                                                               // revoke_and_ack before we can update the commitment
+                                                                                               // transaction. The Channel will automatically handle
+                                                                                               // building the update_fail_htlc and commitment_signed
+                                                                                               // messages when we can.
+                                                                                               // We don't need any kind of timer here as they should fail
+                                                                                               // the channel onto the chain if they can't get our
+                                                                                               // update_fail_htlc in time, it's not our problem.
                                                                                        }
-                                                                                       // fail-backs are best-effort, we probably already have one
-                                                                                       // pending, and if not that's OK, if not, the channel is on
-                                                                                       // the chain and sending the HTLC-Timeout is their problem.
-                                                                                       continue;
-                                                                               },
-                                                                               Ok(Some(msg)) => { fail_htlc_msgs.push(msg); },
-                                                                               Ok(None) => {
-                                                                                       // Nothing to do here...we're waiting on a remote
-                                                                                       // revoke_and_ack before we can update the commitment
-                                                                                       // transaction. The Channel will automatically handle
-                                                                                       // building the update_fail_htlc and commitment_signed
-                                                                                       // messages when we can.
-                                                                                       // We don't need any kind of timer here as they should fail
-                                                                                       // the channel onto the chain if they can't get our
-                                                                                       // update_fail_htlc in time, it's not our problem.
                                                                                }
-                                                                       }
-                                                               },
+                                                                       },
+                                                               }
                                                        }
-                                               }
 
-                                               if !add_htlc_msgs.is_empty() || !fail_htlc_msgs.is_empty() {
-                                                       let (commitment_msg, monitor_update) = match chan.get_mut().send_commitment(&self.logger) {
-                                                               Ok(res) => res,
-                                                               Err(e) => {
-                                                                       // We surely failed send_commitment due to bad keys, in that case
-                                                                       // close channel and then send error message to peer.
-                                                                       let counterparty_node_id = chan.get().get_counterparty_node_id();
-                                                                       let err: Result<(), _>  = match e {
-                                                                               ChannelError::Ignore(_) | ChannelError::Warn(_) => {
-                                                                                       panic!("Stated return value requirements in send_commitment() were not met");
-                                                                               }
-                                                                               ChannelError::Close(msg) => {
-                                                                                       log_trace!(self.logger, "Closing channel {} due to Close-required error: {}", log_bytes!(chan.key()[..]), msg);
-                                                                                       let mut channel = remove_channel!(self, channel_state, chan);
-                                                                                       // ChannelClosed event is generated by handle_error for us.
-                                                                                       Err(MsgHandleErrInternal::from_finish_shutdown(msg, channel.channel_id(), channel.get_user_id(), channel.force_shutdown(true), self.get_channel_update_for_broadcast(&channel).ok()))
-                                                                               },
-                                                                       };
-                                                                       handle_errors.push((counterparty_node_id, err));
-                                                                       continue;
-                                                               }
-                                                       };
-                                                       match self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), monitor_update) {
-                                                               ChannelMonitorUpdateStatus::Completed => {},
-                                                               e => {
-                                                                       handle_errors.push((chan.get().get_counterparty_node_id(), handle_monitor_update_res!(self, e, chan, RAACommitmentOrder::CommitmentFirst, false, true)));
-                                                                       continue;
+                                                       if !add_htlc_msgs.is_empty() || !fail_htlc_msgs.is_empty() {
+                                                               let (commitment_msg, monitor_update) = match chan.get_mut().send_commitment(&self.logger) {
+                                                                       Ok(res) => res,
+                                                                       Err(e) => {
+                                                                               // We surely failed send_commitment due to bad keys, in that case
+                                                                               // close channel and then send error message to peer.
+                                                                               let counterparty_node_id = chan.get().get_counterparty_node_id();
+                                                                               let err: Result<(), _>  = match e {
+                                                                                       ChannelError::Ignore(_) | ChannelError::Warn(_) => {
+                                                                                               panic!("Stated return value requirements in send_commitment() were not met");
+                                                                                       }
+                                                                                       ChannelError::Close(msg) => {
+                                                                                               log_trace!(self.logger, "Closing channel {} due to Close-required error: {}", log_bytes!(chan.key()[..]), msg);
+                                                                                               let mut channel = remove_channel!(self, chan);
+                                                                                               // ChannelClosed event is generated by handle_error for us.
+                                                                                               Err(MsgHandleErrInternal::from_finish_shutdown(msg, channel.channel_id(), channel.get_user_id(), channel.force_shutdown(true), self.get_channel_update_for_broadcast(&channel).ok()))
+                                                                                       },
+                                                                               };
+                                                                               handle_errors.push((counterparty_node_id, err));
+                                                                               continue;
+                                                                       }
+                                                               };
+                                                               match self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), monitor_update) {
+                                                                       ChannelMonitorUpdateStatus::Completed => {},
+                                                                       e => {
+                                                                               handle_errors.push((chan.get().get_counterparty_node_id(), handle_monitor_update_res!(self, e, chan, RAACommitmentOrder::CommitmentFirst, false, true)));
+                                                                               continue;
+                                                                       }
                                                                }
+                                                               log_debug!(self.logger, "Forwarding HTLCs resulted in a commitment update with {} HTLCs added and {} HTLCs failed for channel {}",
+                                                                       add_htlc_msgs.len(), fail_htlc_msgs.len(), log_bytes!(chan.get().channel_id()));
+                                                               channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
+                                                                       node_id: chan.get().get_counterparty_node_id(),
+                                                                       updates: msgs::CommitmentUpdate {
+                                                                               update_add_htlcs: add_htlc_msgs,
+                                                                               update_fulfill_htlcs: Vec::new(),
+                                                                               update_fail_htlcs: fail_htlc_msgs,
+                                                                               update_fail_malformed_htlcs: Vec::new(),
+                                                                               update_fee: None,
+                                                                               commitment_signed: commitment_msg,
+                                                                       },
+                                                               });
                                                        }
-                                                       log_debug!(self.logger, "Forwarding HTLCs resulted in a commitment update with {} HTLCs added and {} HTLCs failed for channel {}",
-                                                               add_htlc_msgs.len(), fail_htlc_msgs.len(), log_bytes!(chan.get().channel_id()));
-                                                       channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
-                                                               node_id: chan.get().get_counterparty_node_id(),
-                                                               updates: msgs::CommitmentUpdate {
-                                                                       update_add_htlcs: add_htlc_msgs,
-                                                                       update_fulfill_htlcs: Vec::new(),
-                                                                       update_fail_htlcs: fail_htlc_msgs,
-                                                                       update_fail_malformed_htlcs: Vec::new(),
-                                                                       update_fee: None,
-                                                                       commitment_signed: commitment_msg,
-                                                               },
-                                                       });
                                                }
-                                       } else {
-                                               unreachable!();
                                        }
                                } else {
                                        for forward_info in pending_forwards.drain(..) {
                                                match forward_info {
-                                                       HTLCForwardInfo::AddHTLC { prev_short_channel_id, prev_htlc_id, forward_info: PendingHTLCInfo {
-                                                                       routing, incoming_shared_secret, payment_hash, amt_to_forward, .. },
-                                                                       prev_funding_outpoint } => {
+                                                       HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo {
+                                                               prev_short_channel_id, prev_htlc_id, prev_funding_outpoint,
+                                                               forward_info: PendingHTLCInfo {
+                                                                       routing, incoming_shared_secret, payment_hash, outgoing_amt_msat, ..
+                                                               }
+                                                       }) => {
                                                                let (cltv_expiry, onion_payload, payment_data, phantom_shared_secret) = match routing {
                                                                        PendingHTLCRouting::Receive { payment_data, incoming_cltv_expiry, phantom_shared_secret } => {
                                                                                let _legacy_hop_data = Some(payment_data.clone());
@@ -3378,9 +3405,9 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
                                                                                incoming_packet_shared_secret: incoming_shared_secret,
                                                                                phantom_shared_secret,
                                                                        },
-                                                                       value: amt_to_forward,
+                                                                       value: outgoing_amt_msat,
                                                                        timer_ticks: 0,
-                                                                       total_msat: if let Some(data) = &payment_data { data.total_msat } else { amt_to_forward },
+                                                                       total_msat: if let Some(data) = &payment_data { data.total_msat } else { outgoing_amt_msat },
                                                                        cltv_expiry,
                                                                        onion_payload,
                                                                };
@@ -3487,7 +3514,7 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
                                                                                                                e.insert((purpose.clone(), vec![claimable_htlc]));
                                                                                                                new_events.push(events::Event::PaymentReceived {
                                                                                                                        payment_hash,
-                                                                                                                       amount_msat: amt_to_forward,
+                                                                                                                       amount_msat: outgoing_amt_msat,
                                                                                                                        purpose,
                                                                                                                });
                                                                                                        },
@@ -4310,7 +4337,7 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
                                        return ClaimFundsFromHop::MonitorUpdateFail(counterparty_node_id, res, None);
                                },
                        }
-               } else { unreachable!(); }
+               } else { return ClaimFundsFromHop::PrevHopForceClosed }
        }
 
        fn finalize_claims(&self, mut sources: Vec<HTLCSource>) {
@@ -4558,7 +4585,7 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
                                                }
                                        };
                                        channel_state.pending_msg_events.push(send_msg_err_event);
-                                       let _ = remove_channel!(self, channel_state, channel);
+                                       let _ = remove_channel!(self, channel);
                                        return Err(APIError::APIMisuseError { err: "Please use accept_inbound_channel_from_trusted_peer_0conf to accept channels with zero confirmations.".to_owned() });
                                }
 
@@ -4638,7 +4665,7 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
                                        if chan.get().get_counterparty_node_id() != *counterparty_node_id {
                                                return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.temporary_channel_id));
                                        }
-                                       try_chan_entry!(self, chan.get_mut().accept_channel(&msg, &self.default_configuration.channel_handshake_limits, &their_features), channel_state, chan);
+                                       try_chan_entry!(self, chan.get_mut().accept_channel(&msg, &self.default_configuration.channel_handshake_limits, &their_features), chan);
                                        (chan.get().get_value_satoshis(), chan.get().get_funding_redeemscript().to_v0_p2wsh(), chan.get().get_user_id())
                                },
                                hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.temporary_channel_id))
@@ -4665,7 +4692,7 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
                                        if chan.get().get_counterparty_node_id() != *counterparty_node_id {
                                                return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.temporary_channel_id));
                                        }
-                                       (try_chan_entry!(self, chan.get_mut().funding_created(msg, best_block, &self.logger), channel_state, chan), chan.remove())
+                                       (try_chan_entry!(self, chan.get_mut().funding_created(msg, best_block, &self.logger), chan), chan.remove())
                                },
                                hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.temporary_channel_id))
                        }
@@ -4738,7 +4765,7 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
                                        }
                                        let (monitor, funding_tx, channel_ready) = match chan.get_mut().funding_signed(&msg, best_block, &self.logger) {
                                                Ok(update) => update,
-                                               Err(e) => try_chan_entry!(self, Err(e), channel_state, chan),
+                                               Err(e) => try_chan_entry!(self, Err(e), chan),
                                        };
                                        match self.chain_monitor.watch_channel(chan.get().get_funding_txo().unwrap(), monitor) {
                                                ChannelMonitorUpdateStatus::Completed => {},
@@ -4777,7 +4804,7 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
                                        return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id));
                                }
                                let announcement_sigs_opt = try_chan_entry!(self, chan.get_mut().channel_ready(&msg, self.get_our_node_id(),
-                                       self.genesis_hash.clone(), &self.best_block.read().unwrap(), &self.logger), channel_state, chan);
+                                       self.genesis_hash.clone(), &self.best_block.read().unwrap(), &self.logger), chan);
                                if let Some(announcement_sigs) = announcement_sigs_opt {
                                        log_trace!(self.logger, "Sending announcement_signatures for channel {}", log_bytes!(chan.get().channel_id()));
                                        channel_state.pending_msg_events.push(events::MessageSendEvent::SendAnnouncementSignatures {
@@ -4825,7 +4852,7 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
                                                        if chan_entry.get().sent_shutdown() { " after we initiated shutdown" } else { "" });
                                        }
 
-                                       let (shutdown, monitor_update, htlcs) = try_chan_entry!(self, chan_entry.get_mut().shutdown(&self.keys_manager, &their_features, &msg), channel_state, chan_entry);
+                                       let (shutdown, monitor_update, htlcs) = try_chan_entry!(self, chan_entry.get_mut().shutdown(&self.keys_manager, &their_features, &msg), chan_entry);
                                        dropped_htlcs = htlcs;
 
                                        // Update the monitor with the shutdown script if necessary.
@@ -4834,7 +4861,7 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
                                                let (result, is_permanent) =
                                                        handle_monitor_update_res!(self, update_res, chan_entry.get_mut(), RAACommitmentOrder::CommitmentFirst, chan_entry.key(), NO_UPDATE);
                                                if is_permanent {
-                                                       remove_channel!(self, channel_state, chan_entry);
+                                                       remove_channel!(self, chan_entry);
                                                        break result;
                                                }
                                        }
@@ -4869,7 +4896,7 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
                                        if chan_entry.get().get_counterparty_node_id() != *counterparty_node_id {
                                                return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id));
                                        }
-                                       let (closing_signed, tx) = try_chan_entry!(self, chan_entry.get_mut().closing_signed(&self.fee_estimator, &msg), channel_state, chan_entry);
+                                       let (closing_signed, tx) = try_chan_entry!(self, chan_entry.get_mut().closing_signed(&self.fee_estimator, &msg), chan_entry);
                                        if let Some(msg) = closing_signed {
                                                channel_state.pending_msg_events.push(events::MessageSendEvent::SendClosingSigned {
                                                        node_id: counterparty_node_id.clone(),
@@ -4882,7 +4909,7 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
                                                // also implies there are no pending HTLCs left on the channel, so we can
                                                // fully delete it from tracking (the channel monitor is still around to
                                                // watch for old state broadcasts)!
-                                               (tx, Some(remove_channel!(self, channel_state, chan_entry)))
+                                               (tx, Some(remove_channel!(self, chan_entry)))
                                        } else { (tx, None) }
                                },
                                hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id))
@@ -4946,7 +4973,7 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
                                                _ => pending_forward_info
                                        }
                                };
-                               try_chan_entry!(self, chan.get_mut().update_add_htlc(&msg, pending_forward_info, create_pending_htlc_status, &self.logger), channel_state, chan);
+                               try_chan_entry!(self, chan.get_mut().update_add_htlc(&msg, pending_forward_info, create_pending_htlc_status, &self.logger), chan);
                        },
                        hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id))
                }
@@ -4962,7 +4989,7 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
                                        if chan.get().get_counterparty_node_id() != *counterparty_node_id {
                                                return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id));
                                        }
-                                       try_chan_entry!(self, chan.get_mut().update_fulfill_htlc(&msg), channel_state, chan)
+                                       try_chan_entry!(self, chan.get_mut().update_fulfill_htlc(&msg), chan)
                                },
                                hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id))
                        }
@@ -4979,7 +5006,7 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
                                if chan.get().get_counterparty_node_id() != *counterparty_node_id {
                                        return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id));
                                }
-                               try_chan_entry!(self, chan.get_mut().update_fail_htlc(&msg, HTLCFailReason::LightningError { err: msg.reason.clone() }), channel_state, chan);
+                               try_chan_entry!(self, chan.get_mut().update_fail_htlc(&msg, HTLCFailReason::LightningError { err: msg.reason.clone() }), chan);
                        },
                        hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id))
                }
@@ -4996,9 +5023,9 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
                                }
                                if (msg.failure_code & 0x8000) == 0 {
                                        let chan_err: ChannelError = ChannelError::Close("Got update_fail_malformed_htlc with BADONION not set".to_owned());
-                                       try_chan_entry!(self, Err(chan_err), channel_state, chan);
+                                       try_chan_entry!(self, Err(chan_err), chan);
                                }
-                               try_chan_entry!(self, chan.get_mut().update_fail_malformed_htlc(&msg, HTLCFailReason::Reason { failure_code: msg.failure_code, data: Vec::new() }), channel_state, chan);
+                               try_chan_entry!(self, chan.get_mut().update_fail_malformed_htlc(&msg, HTLCFailReason::Reason { failure_code: msg.failure_code, data: Vec::new() }), chan);
                                Ok(())
                        },
                        hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id))
@@ -5015,11 +5042,11 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
                                }
                                let (revoke_and_ack, commitment_signed, monitor_update) =
                                        match chan.get_mut().commitment_signed(&msg, &self.logger) {
-                                               Err((None, e)) => try_chan_entry!(self, Err(e), channel_state, chan),
+                                               Err((None, e)) => try_chan_entry!(self, Err(e), chan),
                                                Err((Some(update), e)) => {
                                                        assert!(chan.get().is_awaiting_monitor_update());
                                                        let _ = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), update);
-                                                       try_chan_entry!(self, Err(e), channel_state, chan);
+                                                       try_chan_entry!(self, Err(e), chan);
                                                        unreachable!();
                                                },
                                                Ok(res) => res
@@ -5068,12 +5095,12 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
                                                        PendingHTLCRouting::ReceiveKeysend { .. } => 0,
                                        }) {
                                                hash_map::Entry::Occupied(mut entry) => {
-                                                       entry.get_mut().push(HTLCForwardInfo::AddHTLC { prev_short_channel_id, prev_funding_outpoint,
-                                                                                                       prev_htlc_id, forward_info });
+                                                       entry.get_mut().push(HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo {
+                                                               prev_short_channel_id, prev_funding_outpoint, prev_htlc_id, forward_info }));
                                                },
                                                hash_map::Entry::Vacant(entry) => {
-                                                       entry.insert(vec!(HTLCForwardInfo::AddHTLC { prev_short_channel_id, prev_funding_outpoint,
-                                                                                                    prev_htlc_id, forward_info }));
+                                                       entry.insert(vec!(HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo {
+                                                               prev_short_channel_id, prev_funding_outpoint, prev_htlc_id, forward_info })));
                                                }
                                        }
                                }
@@ -5102,7 +5129,7 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
                                        }
                                        let was_paused_for_mon_update = chan.get().is_awaiting_monitor_update();
                                        let raa_updates = break_chan_entry!(self,
-                                               chan.get_mut().revoke_and_ack(&msg, &self.logger), channel_state, chan);
+                                               chan.get_mut().revoke_and_ack(&msg, &self.logger), chan);
                                        htlcs_to_fail = raa_updates.holding_cell_failed_htlcs;
                                        let update_res = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), raa_updates.monitor_update);
                                        if was_paused_for_mon_update {
@@ -5162,7 +5189,7 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
                                if chan.get().get_counterparty_node_id() != *counterparty_node_id {
                                        return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id));
                                }
-                               try_chan_entry!(self, chan.get_mut().update_fee(&self.fee_estimator, &msg), channel_state, chan);
+                               try_chan_entry!(self, chan.get_mut().update_fee(&self.fee_estimator, &msg), chan);
                        },
                        hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id))
                }
@@ -5184,7 +5211,7 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
 
                                channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelAnnouncement {
                                        msg: try_chan_entry!(self, chan.get_mut().announcement_signatures(
-                                               self.get_our_node_id(), self.genesis_hash.clone(), self.best_block.read().unwrap().height(), msg), channel_state, chan),
+                                               self.get_our_node_id(), self.genesis_hash.clone(), self.best_block.read().unwrap().height(), msg), chan),
                                        // Note that announcement_signatures fails if the channel cannot be announced,
                                        // so get_channel_update_for_broadcast will never fail by the time we get here.
                                        update_msg: self.get_channel_update_for_broadcast(chan.get()).unwrap(),
@@ -5223,10 +5250,10 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
                                        return Ok(NotifyOption::SkipPersist);
                                } else {
                                        log_debug!(self.logger, "Received channel_update for channel {}.", log_bytes!(chan_id));
-                                       try_chan_entry!(self, chan.get_mut().channel_update(&msg), channel_state, chan);
+                                       try_chan_entry!(self, chan.get_mut().channel_update(&msg), chan);
                                }
                        },
-                       hash_map::Entry::Vacant(_) => unreachable!()
+                       hash_map::Entry::Vacant(_) => return Ok(NotifyOption::SkipPersist)
                }
                Ok(NotifyOption::DoPersist)
        }
@@ -5248,7 +5275,7 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
                                        // add-HTLCs on disconnect, we may be handed HTLCs to fail backwards here.
                                        let responses = try_chan_entry!(self, chan.get_mut().channel_reestablish(
                                                msg, &self.logger, self.our_network_pubkey.clone(), self.genesis_hash,
-                                               &*self.best_block.read().unwrap()), channel_state, chan);
+                                               &*self.best_block.read().unwrap()), chan);
                                        let mut channel_update = None;
                                        if let Some(msg) = responses.shutdown_msg {
                                                channel_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown {
@@ -5312,7 +5339,7 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
                                                let by_id = &mut channel_state.by_id;
                                                let pending_msg_events = &mut channel_state.pending_msg_events;
                                                if let hash_map::Entry::Occupied(chan_entry) = by_id.entry(funding_outpoint.to_channel_id()) {
-                                                       let mut chan = remove_channel!(self, channel_state, chan_entry);
+                                                       let mut chan = remove_channel!(self, chan_entry);
                                                        failed_channels.push(chan.force_shutdown(false));
                                                        if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
                                                                pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
@@ -5679,7 +5706,7 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
        #[cfg(any(test, fuzzing, feature = "_test_utils"))]
        pub fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
                let events = core::cell::RefCell::new(Vec::new());
-               let event_handler = |event: &events::Event| events.borrow_mut().push(event.clone());
+               let event_handler = |event: events::Event| events.borrow_mut().push(event);
                self.process_pending_events(&event_handler);
                events.into_inner()
        }
@@ -5693,6 +5720,39 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
        pub fn clear_pending_payments(&self) {
                self.pending_outbound_payments.lock().unwrap().clear()
        }
+
+       /// Processes any events asynchronously in the order they were generated since the last call
+       /// using the given event handler.
+       ///
+       /// See the trait-level documentation of [`EventsProvider`] for requirements.
+       pub async fn process_pending_events_async<Future: core::future::Future, H: Fn(Event) -> Future>(
+               &self, handler: H
+       ) {
+               // We'll acquire our total consistency lock until the returned future completes so that
+               // we can be sure no other persists happen while processing events.
+               let _read_guard = self.total_consistency_lock.read().unwrap();
+
+               let mut result = NotifyOption::SkipPersist;
+
+               // TODO: This behavior should be documented. It's unintuitive that we query
+               // ChannelMonitors when clearing other events.
+               if self.process_pending_monitor_events() {
+                       result = NotifyOption::DoPersist;
+               }
+
+               let pending_events = mem::replace(&mut *self.pending_events.lock().unwrap(), vec![]);
+               if !pending_events.is_empty() {
+                       result = NotifyOption::DoPersist;
+               }
+
+               for event in pending_events {
+                       handler(event).await;
+               }
+
+               if result == NotifyOption::DoPersist {
+                       self.persistence_notifier.notify();
+               }
+       }
 }
 
 impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> MessageSendEventsProvider for ChannelManager<M, T, K, F, L>
@@ -5756,13 +5816,13 @@ where
                                result = NotifyOption::DoPersist;
                        }
 
-                       let mut pending_events = mem::replace(&mut *self.pending_events.lock().unwrap(), vec![]);
+                       let pending_events = mem::replace(&mut *self.pending_events.lock().unwrap(), vec![]);
                        if !pending_events.is_empty() {
                                result = NotifyOption::DoPersist;
                        }
 
-                       for event in pending_events.drain(..) {
-                               handler.handle_event(&event);
+                       for event in pending_events {
+                               handler.handle_event(event);
                        }
 
                        result
@@ -5871,12 +5931,12 @@ where
                });
        }
 
-       fn get_relevant_txids(&self) -> Vec<Txid> {
+       fn get_relevant_txids(&self) -> Vec<(Txid, Option<BlockHash>)> {
                let channel_state = self.channel_state.lock().unwrap();
                let mut res = Vec::with_capacity(channel_state.by_id.len());
                for chan in channel_state.by_id.values() {
-                       if let Some(funding_txo) = chan.get_funding_txo() {
-                               res.push(funding_txo.txid);
+                       if let (Some(funding_txo), block_hash) = (chan.get_funding_txo(), chan.get_funding_tx_confirmed_in()) {
+                               res.push((funding_txo.txid, block_hash));
                        }
                }
                res
@@ -6437,8 +6497,9 @@ impl_writeable_tlv_based!(PendingHTLCInfo, {
        (0, routing, required),
        (2, incoming_shared_secret, required),
        (4, payment_hash, required),
-       (6, amt_to_forward, required),
-       (8, outgoing_cltv_value, required)
+       (6, outgoing_amt_msat, required),
+       (8, outgoing_cltv_value, required),
+       (9, incoming_amt_msat, option),
 });
 
 
@@ -6637,7 +6698,7 @@ impl Writeable for HTLCSource {
                                        (1, payment_id_opt, option),
                                        (2, first_hop_htlc_msat, required),
                                        (3, payment_secret, option),
-                                       (4, path, vec_type),
+                                       (4, *path, vec_type),
                                        (5, payment_params, option),
                                 });
                        }
@@ -6660,18 +6721,20 @@ impl_writeable_tlv_based_enum!(HTLCFailReason,
        },
 ;);
 
+impl_writeable_tlv_based!(PendingAddHTLCInfo, {
+       (0, forward_info, required),
+       (2, prev_short_channel_id, required),
+       (4, prev_htlc_id, required),
+       (6, prev_funding_outpoint, required),
+});
+
 impl_writeable_tlv_based_enum!(HTLCForwardInfo,
-       (0, AddHTLC) => {
-               (0, forward_info, required),
-               (2, prev_short_channel_id, required),
-               (4, prev_htlc_id, required),
-               (6, prev_funding_outpoint, required),
-       },
        (1, FailHTLC) => {
                (0, htlc_id, required),
                (2, err_packet, required),
-       },
-;);
+       };
+       (0, AddHTLC)
+);
 
 impl_writeable_tlv_based!(PendingInboundPayment, {
        (0, payment_secret, required),