Consider channels "live" even if they are awaiting a monitor update
[rust-lightning] / lightning / src / ln / channelmanager.rs
index 39426b3fa7b2656b7667c29b94d94c50d6b4cc96..292fa26733bcea467d69defe85c5e4e47b7f2804 100644 (file)
@@ -36,14 +36,16 @@ use bitcoin::secp256k1::ecdh::SharedSecret;
 use bitcoin::secp256k1;
 
 use chain;
+use chain::Confirm;
 use chain::Watch;
 use chain::chaininterface::{BroadcasterInterface, FeeEstimator};
 use chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateStep, ChannelMonitorUpdateErr, HTLC_FAIL_BACK_BUFFER, CLTV_CLAIM_BUFFER, LATENCY_GRACE_PERIOD_BLOCKS, ANTI_REORG_DELAY, MonitorEvent, CLOSED_CHANNEL_UPDATE_ID};
 use chain::transaction::{OutPoint, TransactionData};
 // Since this struct is returned in `list_channels` methods, expose it here in case users want to
 // construct one themselves.
+use ln::{PaymentHash, PaymentPreimage, PaymentSecret};
 pub use ln::channel::CounterpartyForwardingInfo;
-use ln::channel::{Channel, ChannelError};
+use ln::channel::{Channel, ChannelError, ChannelUpdateStatus};
 use ln::features::{InitFeatures, NodeFeatures};
 use routing::router::{Route, RouteHop};
 use ln::msgs;
@@ -52,23 +54,24 @@ use ln::onion_utils;
 use ln::msgs::{ChannelMessageHandler, DecodeError, LightningError, OptionalField};
 use chain::keysinterface::{Sign, KeysInterface, KeysManager, InMemorySigner};
 use util::config::UserConfig;
-use util::events::{Event, EventsProvider, MessageSendEvent, MessageSendEventsProvider};
+use util::events::{EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider};
 use util::{byte_utils, events};
 use util::ser::{Readable, ReadableArgs, MaybeReadable, Writeable, Writer};
 use util::chacha20::{ChaCha20, ChaChaReader};
 use util::logger::Logger;
 use util::errors::APIError;
 
-use std::{cmp, mem};
+use prelude::*;
+use core::{cmp, mem};
+use core::cell::RefCell;
 use std::collections::{HashMap, hash_map, HashSet};
 use std::io::{Cursor, Read};
 use std::sync::{Arc, Condvar, Mutex, MutexGuard, RwLock, RwLockReadGuard};
-use std::sync::atomic::{AtomicUsize, Ordering};
-use std::time::Duration;
+use core::sync::atomic::{AtomicUsize, Ordering};
+use core::time::Duration;
 #[cfg(any(test, feature = "allow_wallclock_use"))]
 use std::time::Instant;
-use std::marker::{Sync, Send};
-use std::ops::Deref;
+use core::ops::Deref;
 use bitcoin::hashes::hex::ToHex;
 
 // We hold various information about HTLC relay in the HTLC objects in Channel itself:
@@ -95,7 +98,7 @@ enum PendingHTLCRouting {
                short_channel_id: u64, // This should be NonZero<u64> eventually when we bump MSRV
        },
        Receive {
-               payment_data: Option<msgs::FinalOnionHopData>,
+               payment_data: msgs::FinalOnionHopData,
                incoming_cltv_expiry: u32, // Used to track when we should expire pending HTLCs that go unclaimed
        },
 }
@@ -155,11 +158,10 @@ pub(crate) struct HTLCPreviousHopData {
 struct ClaimableHTLC {
        prev_hop: HTLCPreviousHopData,
        value: u64,
-       /// Filled in when the HTLC was received with a payment_secret packet, which contains a
-       /// total_msat (which may differ from value if this is a Multi-Path Payment) and a
+       /// Contains a total_msat (which may differ from value if this is a Multi-Path Payment) and a
        /// payment_secret which prevents path-probing attacks and can associate different HTLCs which
        /// are part of the same payment.
-       payment_data: Option<msgs::FinalOnionHopData>,
+       payment_data: msgs::FinalOnionHopData,
        cltv_expiry: u32,
 }
 
@@ -197,19 +199,6 @@ pub(super) enum HTLCFailReason {
        }
 }
 
-/// payment_hash type, use to cross-lock hop
-/// (C-not exported) as we just use [u8; 32] directly
-#[derive(Hash, Copy, Clone, PartialEq, Eq, Debug)]
-pub struct PaymentHash(pub [u8;32]);
-/// payment_preimage type, use to route payment between hop
-/// (C-not exported) as we just use [u8; 32] directly
-#[derive(Hash, Copy, Clone, PartialEq, Eq, Debug)]
-pub struct PaymentPreimage(pub [u8;32]);
-/// payment_secret type, use to authenticate sender to the receiver and tie MPP HTLCs together
-/// (C-not exported) as we just use [u8; 32] directly
-#[derive(Hash, Copy, Clone, PartialEq, Eq, Debug)]
-pub struct PaymentSecret(pub [u8;32]);
-
 type ShutdownResult = (Option<(OutPoint, ChannelMonitorUpdate)>, Vec<(HTLCSource, PaymentHash)>);
 
 /// Error type returned across the channel_state mutex boundary. When an Err is generated for a
@@ -326,12 +315,11 @@ pub(super) struct ChannelHolder<Signer: Sign> {
        /// guarantees are made about the existence of a channel with the short id here, nor the short
        /// ids in the PendingHTLCInfo!
        pub(super) forward_htlcs: HashMap<u64, Vec<HTLCForwardInfo>>,
-       /// (payment_hash, payment_secret) -> Vec<HTLCs> for tracking HTLCs that
-       /// were to us and can be failed/claimed by the user
+       /// Map from payment hash to any HTLCs which are to us and can be failed/claimed by the user.
        /// Note that while this is held in the same mutex as the channels themselves, no consistency
        /// guarantees are made about the channels given here actually existing anymore by the time you
        /// go to read them!
-       claimable_htlcs: HashMap<(PaymentHash, Option<PaymentSecret>), Vec<ClaimableHTLC>>,
+       claimable_htlcs: HashMap<PaymentHash, Vec<ClaimableHTLC>>,
        /// Messages to send to peers - pushed to in the same lock that they are generated in (except
        /// for broadcast messages, where ordering isn't as strict).
        pub(super) pending_msg_events: Vec<MessageSendEvent>,
@@ -352,6 +340,24 @@ struct PeerState {
        latest_features: InitFeatures,
 }
 
+/// Stores a PaymentSecret and any other data we may need to validate an inbound payment is
+/// actually ours and not some duplicate HTLC sent to us by a node along the route.
+///
+/// For users who don't want to bother doing their own payment preimage storage, we also store that
+/// here.
+struct PendingInboundPayment {
+       /// The payment secret that the sender must use for us to accept this payment
+       payment_secret: PaymentSecret,
+       /// Time at which this HTLC expires - blocks with a header time above this value will result in
+       /// this payment being removed.
+       expiry_time: u64,
+       /// Arbitrary identifier the user specifies (or not)
+       user_payment_id: u64,
+       // Other required attributes of the payment, optionally enforced:
+       payment_preimage: Option<PaymentPreimage>,
+       min_value_msat: Option<u64>,
+}
+
 /// SimpleArcChannelManager is useful when you need a ChannelManager with a static lifetime, e.g.
 /// when you're using lightning-net-tokio (since tokio::spawn requires parameters with static
 /// lifetimes). Other times you can afford a reference, which is more efficient, in which case
@@ -430,6 +436,26 @@ pub struct ChannelManager<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref,
        pub(super) channel_state: Mutex<ChannelHolder<Signer>>,
        #[cfg(not(any(test, feature = "_test_utils")))]
        channel_state: Mutex<ChannelHolder<Signer>>,
+
+       /// Storage for PaymentSecrets and any requirements on future inbound payments before we will
+       /// expose them to users via a PaymentReceived event. HTLCs which do not meet the requirements
+       /// here are failed when we process them as pending-forwardable-HTLCs, and entries are removed
+       /// after we generate a PaymentReceived upon receipt of all MPP parts or when they time out.
+       /// Locked *after* channel_state.
+       pending_inbound_payments: Mutex<HashMap<PaymentHash, PendingInboundPayment>>,
+
+       /// The session_priv bytes of outbound payments which are pending resolution.
+       /// The authoritative state of these HTLCs resides either within Channels or ChannelMonitors
+       /// (if the channel has been force-closed), however we track them here to prevent duplicative
+       /// PaymentSent/PaymentFailed events. Specifically, in the case of a duplicative
+       /// update_fulfill_htlc message after a reconnect, we may "claim" a payment twice.
+       /// Additionally, because ChannelMonitors are often not re-serialized after connecting block(s)
+       /// which may generate a claim event, we may receive similar duplicate claim/fail MonitorEvents
+       /// after reloading from disk while replaying blocks against ChannelMonitors.
+       ///
+       /// Locked *after* channel_state.
+       pending_outbound_payments: Mutex<HashSet<[u8; 32]>>,
+
        our_network_key: SecretKey,
        our_network_pubkey: PublicKey,
 
@@ -437,6 +463,11 @@ pub struct ChannelManager<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref,
        /// value increases strictly since we don't assume access to a time source.
        last_node_announcement_serial: AtomicUsize,
 
+       /// The highest block timestamp we've seen, which is usually a good guess at the current time.
+       /// Assuming most miners are generating blocks with reasonable timestamps, this shouldn't be
+       /// very far in the past, and can only ever be up to two hours in the future.
+       highest_seen_timestamp: AtomicUsize,
+
        /// The bulk of our storage will eventually be here (channels and message queues and the like).
        /// If we are connected to a peer we always at least have an entry here, even if no channels
        /// are currently open with that peer.
@@ -451,8 +482,8 @@ pub struct ChannelManager<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref,
        /// Essentially just when we're serializing ourselves out.
        /// Taken first everywhere where we are making changes before any other locks.
        /// When acquiring this lock in read mode, rather than acquiring it directly, call
-       /// `PersistenceNotifierGuard::new(..)` and pass the lock to it, to ensure the PersistenceNotifier
-       /// the lock contains sends out a notification when the lock is released.
+       /// `PersistenceNotifierGuard::notify_on_drop(..)` and pass the lock to it, to ensure the
+       /// PersistenceNotifier the lock contains sends out a notification when the lock is released.
        total_consistency_lock: RwLock<()>,
 
        persistence_notifier: PersistenceNotifier,
@@ -467,6 +498,7 @@ pub struct ChannelManager<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref,
 /// Typically, the block-specific parameters are derived from the best block hash for the network,
 /// as a newly constructed `ChannelManager` will not have created any channels yet. These parameters
 /// are not needed when deserializing a previously constructed `ChannelManager`.
+#[derive(Clone, Copy, PartialEq)]
 pub struct ChainParameters {
        /// The network for determining the `chain_hash` in Lightning messages.
        pub network: Network,
@@ -478,7 +510,7 @@ pub struct ChainParameters {
 }
 
 /// The best known block as identified by its hash and height.
-#[derive(Clone, Copy)]
+#[derive(Clone, Copy, PartialEq)]
 pub struct BestBlock {
        block_hash: BlockHash,
        height: u32,
@@ -505,32 +537,50 @@ impl BestBlock {
        pub fn height(&self) -> u32 { self.height }
 }
 
+#[derive(Copy, Clone, PartialEq)]
+enum NotifyOption {
+       DoPersist,
+       SkipPersist,
+}
+
 /// Whenever we release the `ChannelManager`'s `total_consistency_lock`, from read mode, it is
 /// desirable to notify any listeners on `await_persistable_update_timeout`/
-/// `await_persistable_update` that new updates are available for persistence. Therefore, this
+/// `await_persistable_update` when new updates are available for persistence. Therefore, this
 /// struct is responsible for locking the total consistency lock and, upon going out of scope,
 /// sending the aforementioned notification (since the lock being released indicates that the
 /// updates are ready for persistence).
-struct PersistenceNotifierGuard<'a> {
+///
+/// We allow callers to either always notify by constructing with `notify_on_drop` or choose to
+/// notify or not based on whether relevant changes have been made, providing a closure to
+/// `optionally_notify` which returns a `NotifyOption`.
+struct PersistenceNotifierGuard<'a, F: Fn() -> NotifyOption> {
        persistence_notifier: &'a PersistenceNotifier,
+       should_persist: F,
        // We hold onto this result so the lock doesn't get released immediately.
        _read_guard: RwLockReadGuard<'a, ()>,
 }
 
-impl<'a> PersistenceNotifierGuard<'a> {
-       fn new(lock: &'a RwLock<()>, notifier: &'a PersistenceNotifier) -> Self {
+impl<'a> PersistenceNotifierGuard<'a, fn() -> NotifyOption> { // We don't care what the concrete F is here, it's unused
+       fn notify_on_drop(lock: &'a RwLock<()>, notifier: &'a PersistenceNotifier) -> PersistenceNotifierGuard<'a, impl Fn() -> NotifyOption> {
+               PersistenceNotifierGuard::optionally_notify(lock, notifier, || -> NotifyOption { NotifyOption::DoPersist })
+       }
+
+       fn optionally_notify<F: Fn() -> NotifyOption>(lock: &'a RwLock<()>, notifier: &'a PersistenceNotifier, persist_check: F) -> PersistenceNotifierGuard<'a, F> {
                let read_guard = lock.read().unwrap();
 
-               Self {
+               PersistenceNotifierGuard {
                        persistence_notifier: notifier,
+                       should_persist: persist_check,
                        _read_guard: read_guard,
                }
        }
 }
 
-impl<'a> Drop for PersistenceNotifierGuard<'a> {
+impl<'a, F: Fn() -> NotifyOption> Drop for PersistenceNotifierGuard<'a, F> {
        fn drop(&mut self) {
-               self.persistence_notifier.notify();
+               if (self.should_persist)() == NotifyOption::DoPersist {
+                       self.persistence_notifier.notify();
+               }
        }
 }
 
@@ -546,7 +596,7 @@ pub const BREAKDOWN_TIMEOUT: u16 = 6 * 24;
 pub(crate) const MAX_LOCAL_BREAKDOWN_TIMEOUT: u16 = 2 * 6 * 24 * 7;
 
 /// The minimum number of blocks between an inbound HTLC's CLTV and the corresponding outbound
-/// HTLC's CLTV. The current default represents roughly six hours of blocks at six blocks/hour.
+/// HTLC's CLTV. The current default represents roughly seven hours of blocks at six blocks/hour.
 ///
 /// This can be increased (but not decreased) through [`ChannelConfig::cltv_expiry_delta`]
 ///
@@ -555,9 +605,17 @@ pub(crate) const MAX_LOCAL_BREAKDOWN_TIMEOUT: u16 = 2 * 6 * 24 * 7;
 // i.e. the node we forwarded the payment on to should always have enough room to reliably time out
 // the HTLC via a full update_fail_htlc/commitment_signed dance before we hit the
 // CLTV_CLAIM_BUFFER point (we static assert that it's at least 3 blocks more).
-pub const MIN_CLTV_EXPIRY_DELTA: u16 = 6 * 6;
+pub const MIN_CLTV_EXPIRY_DELTA: u16 = 6*7;
 pub(super) const CLTV_FAR_FAR_AWAY: u32 = 6 * 24 * 7; //TODO?
 
+/// Minimum CLTV difference between the current block height and received inbound payments.
+/// Invoices generated for payment to us must set their `min_final_cltv_expiry` field to at least
+/// this value.
+// Note that we fail if exactly HTLC_FAIL_BACK_BUFFER + 1 was used, so we need to add one for
+// any payments to succeed. Further, we don't want payments to fail if a block was found while
+// a payment was being routed, so we add an extra block to be safe.
+pub const MIN_FINAL_CLTV_EXPIRY: u32 = HTLC_FAIL_BACK_BUFFER + 3;
+
 // Check that our CLTV_EXPIRY is at least CLTV_CLAIM_BUFFER + ANTI_REORG_DELAY + LATENCY_GRACE_PERIOD_BLOCKS,
 // ie that if the next-hop peer fails the HTLC within
 // LATENCY_GRACE_PERIOD_BLOCKS then we'll still have CLTV_CLAIM_BUFFER left to timeout it onchain,
@@ -568,7 +626,7 @@ pub(super) const CLTV_FAR_FAR_AWAY: u32 = 6 * 24 * 7; //TODO?
 #[allow(dead_code)]
 const CHECK_CLTV_EXPIRY_SANITY: u32 = MIN_CLTV_EXPIRY_DELTA as u32 - LATENCY_GRACE_PERIOD_BLOCKS - CLTV_CLAIM_BUFFER - ANTI_REORG_DELAY - LATENCY_GRACE_PERIOD_BLOCKS;
 
-// Check for ability of an attacker to make us fail on-chain by delaying inbound claim. See
+// Check for ability of an attacker to make us fail on-chain by delaying an HTLC claim. See
 // ChannelMontior::would_broadcast_at_height for a description of why this is needed.
 #[deny(const_err)]
 #[allow(dead_code)]
@@ -582,6 +640,12 @@ pub struct ChannelDetails {
        /// Note that this means this value is *not* persistent - it can change once during the
        /// lifetime of the channel.
        pub channel_id: [u8; 32],
+       /// The Channel's funding transaction output, if we've negotiated the funding transaction with
+       /// our counterparty already.
+       ///
+       /// Note that, if this has been set, `channel_id` will be equivalent to
+       /// `funding_txo.unwrap().to_channel_id()`.
+       pub funding_txo: Option<OutPoint>,
        /// The position of the funding transaction in the chain. None if the funding transaction has
        /// not yet been confirmed and the channel fully opened.
        pub short_channel_id: Option<u64>,
@@ -606,10 +670,20 @@ pub struct ChannelDetails {
        /// Note that there are some corner cases not fully handled here, so the actual available
        /// inbound capacity may be slightly higher than this.
        pub inbound_capacity_msat: u64,
+       /// True if the channel was initiated (and thus funded) by us.
+       pub is_outbound: bool,
+       /// True if the channel is confirmed, funding_locked messages have been exchanged, and the
+       /// channel is not currently being shut down. `funding_locked` message exchange implies the
+       /// required confirmation count has been reached (and we were connected to the peer at some
+       /// point after the funding transaction received enough confirmations).
+       pub is_funding_locked: bool,
        /// True if the channel is (a) confirmed and funding_locked messages have been exchanged, (b)
-       /// the peer is connected, and (c) no monitor update failure is pending resolution.
-       pub is_live: bool,
-
+       /// the peer is connected, and (c) the channel is not currently negotiating a shutdown.
+       ///
+       /// This is a strict superset of `is_funding_locked`.
+       pub is_usable: bool,
+       /// True if this channel is (or will be) publicly-announced.
+       pub is_public: bool,
        /// Information on the fees and requirements that the counterparty requires when forwarding
        /// payments to us through this channel.
        pub counterparty_forwarding_info: Option<CounterpartyForwardingInfo>,
@@ -694,22 +768,44 @@ macro_rules! handle_error {
        }
 }
 
+/// Returns (boolean indicating if we should remove the Channel object from memory, a mapped error)
+macro_rules! convert_chan_err {
+       ($self: ident, $err: expr, $short_to_id: expr, $channel: expr, $channel_id: expr) => {
+               match $err {
+                       ChannelError::Ignore(msg) => {
+                               (false, MsgHandleErrInternal::from_chan_no_close(ChannelError::Ignore(msg), $channel_id.clone()))
+                       },
+                       ChannelError::Close(msg) => {
+                               log_trace!($self.logger, "Closing channel {} due to close-required error: {}", log_bytes!($channel_id[..]), msg);
+                               if let Some(short_id) = $channel.get_short_channel_id() {
+                                       $short_to_id.remove(&short_id);
+                               }
+                               let shutdown_res = $channel.force_shutdown(true);
+                               (true, MsgHandleErrInternal::from_finish_shutdown(msg, *$channel_id, shutdown_res, $self.get_channel_update(&$channel).ok()))
+                       },
+                       ChannelError::CloseDelayBroadcast(msg) => {
+                               log_error!($self.logger, "Channel {} need to be shutdown but closing transactions not broadcast due to {}", log_bytes!($channel_id[..]), msg);
+                               if let Some(short_id) = $channel.get_short_channel_id() {
+                                       $short_to_id.remove(&short_id);
+                               }
+                               let shutdown_res = $channel.force_shutdown(false);
+                               (true, MsgHandleErrInternal::from_finish_shutdown(msg, *$channel_id, shutdown_res, $self.get_channel_update(&$channel).ok()))
+                       }
+               }
+       }
+}
+
 macro_rules! break_chan_entry {
        ($self: ident, $res: expr, $channel_state: expr, $entry: expr) => {
                match $res {
                        Ok(res) => res,
-                       Err(ChannelError::Ignore(msg)) => {
-                               break Err(MsgHandleErrInternal::from_chan_no_close(ChannelError::Ignore(msg), $entry.key().clone()))
-                       },
-                       Err(ChannelError::Close(msg)) => {
-                               log_trace!($self.logger, "Closing channel {} due to Close-required error: {}", log_bytes!($entry.key()[..]), msg);
-                               let (channel_id, mut chan) = $entry.remove_entry();
-                               if let Some(short_id) = chan.get_short_channel_id() {
-                                       $channel_state.short_to_id.remove(&short_id);
+                       Err(e) => {
+                               let (drop, res) = convert_chan_err!($self, e, $channel_state.short_to_id, $entry.get_mut(), $entry.key());
+                               if drop {
+                                       $entry.remove_entry();
                                }
-                               break Err(MsgHandleErrInternal::from_finish_shutdown(msg, channel_id, chan.force_shutdown(true), $self.get_channel_update(&chan).ok()))
-                       },
-                       Err(ChannelError::CloseDelayBroadcast(_)) => { panic!("Wait is only generated on receipt of channel_reestablish, which is handled by try_chan_entry, we don't bother to support it here"); }
+                               break Err(res);
+                       }
                }
        }
 }
@@ -718,25 +814,12 @@ macro_rules! try_chan_entry {
        ($self: ident, $res: expr, $channel_state: expr, $entry: expr) => {
                match $res {
                        Ok(res) => res,
-                       Err(ChannelError::Ignore(msg)) => {
-                               return Err(MsgHandleErrInternal::from_chan_no_close(ChannelError::Ignore(msg), $entry.key().clone()))
-                       },
-                       Err(ChannelError::Close(msg)) => {
-                               log_trace!($self.logger, "Closing channel {} due to Close-required error: {}", log_bytes!($entry.key()[..]), msg);
-                               let (channel_id, mut chan) = $entry.remove_entry();
-                               if let Some(short_id) = chan.get_short_channel_id() {
-                                       $channel_state.short_to_id.remove(&short_id);
-                               }
-                               return Err(MsgHandleErrInternal::from_finish_shutdown(msg, channel_id, chan.force_shutdown(true), $self.get_channel_update(&chan).ok()))
-                       },
-                       Err(ChannelError::CloseDelayBroadcast(msg)) => {
-                               log_error!($self.logger, "Channel {} need to be shutdown but closing transactions not broadcast due to {}", log_bytes!($entry.key()[..]), msg);
-                               let (channel_id, mut chan) = $entry.remove_entry();
-                               if let Some(short_id) = chan.get_short_channel_id() {
-                                       $channel_state.short_to_id.remove(&short_id);
+                       Err(e) => {
+                               let (drop, res) = convert_chan_err!($self, e, $channel_state.short_to_id, $entry.get_mut(), $entry.key());
+                               if drop {
+                                       $entry.remove_entry();
                                }
-                               let shutdown_res = chan.force_shutdown(false);
-                               return Err(MsgHandleErrInternal::from_finish_shutdown(msg, channel_id, shutdown_res, $self.get_channel_update(&chan).ok()))
+                               return Err(res);
                        }
                }
        }
@@ -746,13 +829,12 @@ macro_rules! handle_monitor_err {
        ($self: ident, $err: expr, $channel_state: expr, $entry: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr) => {
                handle_monitor_err!($self, $err, $channel_state, $entry, $action_type, $resend_raa, $resend_commitment, Vec::new(), Vec::new())
        };
-       ($self: ident, $err: expr, $channel_state: expr, $entry: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr, $failed_forwards: expr, $failed_fails: expr) => {
+       ($self: ident, $err: expr, $short_to_id: expr, $chan: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr, $failed_forwards: expr, $failed_fails: expr, $chan_id: expr) => {
                match $err {
                        ChannelMonitorUpdateErr::PermanentFailure => {
-                               log_error!($self.logger, "Closing channel {} due to monitor update PermanentFailure", log_bytes!($entry.key()[..]));
-                               let (channel_id, mut chan) = $entry.remove_entry();
-                               if let Some(short_id) = chan.get_short_channel_id() {
-                                       $channel_state.short_to_id.remove(&short_id);
+                               log_error!($self.logger, "Closing channel {} due to monitor update ChannelMonitorUpdateErr::PermanentFailure", log_bytes!($chan_id[..]));
+                               if let Some(short_id) = $chan.get_short_channel_id() {
+                                       $short_to_id.remove(&short_id);
                                }
                                // TODO: $failed_fails is dropped here, which will cause other channels to hit the
                                // chain in a confused state! We need to move them into the ChannelMonitor which
@@ -763,12 +845,12 @@ macro_rules! handle_monitor_err {
                                // splitting hairs we'd prefer to claim payments that were to us, but we haven't
                                // given up the preimage yet, so might as well just wait until the payment is
                                // retried, avoiding the on-chain fees.
-                               let res: Result<(), _> = Err(MsgHandleErrInternal::from_finish_shutdown("ChannelMonitor storage failure".to_owned(), channel_id, chan.force_shutdown(true), $self.get_channel_update(&chan).ok()));
-                               res
+                               let res: Result<(), _> = Err(MsgHandleErrInternal::from_finish_shutdown("ChannelMonitor storage failure".to_owned(), *$chan_id, $chan.force_shutdown(true), $self.get_channel_update(&$chan).ok()));
+                               (res, true)
                        },
                        ChannelMonitorUpdateErr::TemporaryFailure => {
                                log_info!($self.logger, "Disabling channel {} due to monitor update TemporaryFailure. On restore will send {} and process {} forwards and {} fails",
-                                               log_bytes!($entry.key()[..]),
+                                               log_bytes!($chan_id[..]),
                                                if $resend_commitment && $resend_raa {
                                                                match $action_type {
                                                                        RAACommitmentOrder::CommitmentFirst => { "commitment then RAA" },
@@ -785,11 +867,18 @@ macro_rules! handle_monitor_err {
                                if !$resend_raa {
                                        debug_assert!($action_type == RAACommitmentOrder::CommitmentFirst || !$resend_commitment);
                                }
-                               $entry.get_mut().monitor_update_failed($resend_raa, $resend_commitment, $failed_forwards, $failed_fails);
-                               Err(MsgHandleErrInternal::from_chan_no_close(ChannelError::Ignore("Failed to update ChannelMonitor".to_owned()), *$entry.key()))
+                               $chan.monitor_update_failed($resend_raa, $resend_commitment, $failed_forwards, $failed_fails);
+                               (Err(MsgHandleErrInternal::from_chan_no_close(ChannelError::Ignore("Failed to update ChannelMonitor".to_owned()), *$chan_id)), false)
                        },
                }
-       }
+       };
+       ($self: ident, $err: expr, $channel_state: expr, $entry: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr, $failed_forwards: expr, $failed_fails: expr) => { {
+               let (res, drop) = handle_monitor_err!($self, $err, $channel_state.short_to_id, $entry.get_mut(), $action_type, $resend_raa, $resend_commitment, $failed_forwards, $failed_fails, $entry.key());
+               if drop {
+                       $entry.remove_entry();
+               }
+               res
+       } };
 }
 
 macro_rules! return_monitor_err {
@@ -813,6 +902,133 @@ macro_rules! maybe_break_monitor_err {
        }
 }
 
+macro_rules! handle_chan_restoration_locked {
+       ($self: ident, $channel_lock: expr, $channel_state: expr, $channel_entry: expr,
+        $raa: expr, $commitment_update: expr, $order: expr, $chanmon_update: expr,
+        $pending_forwards: expr, $funding_broadcastable: expr, $funding_locked: expr) => { {
+               let mut htlc_forwards = None;
+               let counterparty_node_id = $channel_entry.get().get_counterparty_node_id();
+
+               let chanmon_update: Option<ChannelMonitorUpdate> = $chanmon_update; // Force type-checking to resolve
+               let chanmon_update_is_none = chanmon_update.is_none();
+               let res = loop {
+                       let forwards: Vec<(PendingHTLCInfo, u64)> = $pending_forwards; // Force type-checking to resolve
+                       if !forwards.is_empty() {
+                               htlc_forwards = Some(($channel_entry.get().get_short_channel_id().expect("We can't have pending forwards before funding confirmation"),
+                                       $channel_entry.get().get_funding_txo().unwrap(), forwards));
+                       }
+
+                       if chanmon_update.is_some() {
+                               // On reconnect, we, by definition, only resend a funding_locked if there have been
+                               // no commitment updates, so the only channel monitor update which could also be
+                               // associated with a funding_locked would be the funding_created/funding_signed
+                               // monitor update. That monitor update failing implies that we won't send
+                               // funding_locked until it's been updated, so we can't have a funding_locked and a
+                               // monitor update here (so we don't bother to handle it correctly below).
+                               assert!($funding_locked.is_none());
+                               // A channel monitor update makes no sense without either a funding_locked or a
+                               // commitment update to process after it. Since we can't have a funding_locked, we
+                               // only bother to handle the monitor-update + commitment_update case below.
+                               assert!($commitment_update.is_some());
+                       }
+
+                       if let Some(msg) = $funding_locked {
+                               // Similar to the above, this implies that we're letting the funding_locked fly
+                               // before it should be allowed to.
+                               assert!(chanmon_update.is_none());
+                               $channel_state.pending_msg_events.push(events::MessageSendEvent::SendFundingLocked {
+                                       node_id: counterparty_node_id,
+                                       msg,
+                               });
+                               if let Some(announcement_sigs) = $self.get_announcement_sigs($channel_entry.get()) {
+                                       $channel_state.pending_msg_events.push(events::MessageSendEvent::SendAnnouncementSignatures {
+                                               node_id: counterparty_node_id,
+                                               msg: announcement_sigs,
+                                       });
+                               }
+                               $channel_state.short_to_id.insert($channel_entry.get().get_short_channel_id().unwrap(), $channel_entry.get().channel_id());
+                       }
+
+                       let funding_broadcastable: Option<Transaction> = $funding_broadcastable; // Force type-checking to resolve
+                       if let Some(monitor_update) = chanmon_update {
+                               // We only ever broadcast a funding transaction in response to a funding_signed
+                               // message and the resulting monitor update. Thus, on channel_reestablish
+                               // message handling we can't have a funding transaction to broadcast. When
+                               // processing a monitor update finishing resulting in a funding broadcast, we
+                               // cannot have a second monitor update, thus this case would indicate a bug.
+                               assert!(funding_broadcastable.is_none());
+                               // Given we were just reconnected or finished updating a channel monitor, the
+                               // only case where we can get a new ChannelMonitorUpdate would be if we also
+                               // have some commitment updates to send as well.
+                               assert!($commitment_update.is_some());
+                               if let Err(e) = $self.chain_monitor.update_channel($channel_entry.get().get_funding_txo().unwrap(), monitor_update) {
+                                       // channel_reestablish doesn't guarantee the order it returns is sensical
+                                       // for the messages it returns, but if we're setting what messages to
+                                       // re-transmit on monitor update success, we need to make sure it is sane.
+                                       let mut order = $order;
+                                       if $raa.is_none() {
+                                               order = RAACommitmentOrder::CommitmentFirst;
+                                       }
+                                       break handle_monitor_err!($self, e, $channel_state, $channel_entry, order, $raa.is_some(), true);
+                               }
+                       }
+
+                       macro_rules! handle_cs { () => {
+                               if let Some(update) = $commitment_update {
+                                       $channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
+                                               node_id: counterparty_node_id,
+                                               updates: update,
+                                       });
+                               }
+                       } }
+                       macro_rules! handle_raa { () => {
+                               if let Some(revoke_and_ack) = $raa {
+                                       $channel_state.pending_msg_events.push(events::MessageSendEvent::SendRevokeAndACK {
+                                               node_id: counterparty_node_id,
+                                               msg: revoke_and_ack,
+                                       });
+                               }
+                       } }
+                       match $order {
+                               RAACommitmentOrder::CommitmentFirst => {
+                                       handle_cs!();
+                                       handle_raa!();
+                               },
+                               RAACommitmentOrder::RevokeAndACKFirst => {
+                                       handle_raa!();
+                                       handle_cs!();
+                               },
+                       }
+                       if let Some(tx) = funding_broadcastable {
+                               log_info!($self.logger, "Broadcasting funding transaction with txid {}", tx.txid());
+                               $self.tx_broadcaster.broadcast_transaction(&tx);
+                       }
+                       break Ok(());
+               };
+
+               if chanmon_update_is_none {
+                       // If there was no ChannelMonitorUpdate, we should never generate an Err in the res loop
+                       // above. Doing so would imply calling handle_err!() from channel_monitor_updated() which
+                       // should *never* end up calling back to `chain_monitor.update_channel()`.
+                       assert!(res.is_ok());
+               }
+
+               (htlc_forwards, res, counterparty_node_id)
+       } }
+}
+
+macro_rules! post_handle_chan_restoration {
+       ($self: ident, $locked_res: expr) => { {
+               let (htlc_forwards, res, counterparty_node_id) = $locked_res;
+
+               let _ = handle_error!($self, res, counterparty_node_id);
+
+               if let Some(forwards) = htlc_forwards {
+                       $self.forward_htlcs(&mut [forwards][..]);
+               }
+       } }
+}
+
 impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<Signer, M, T, K, F, L>
        where M::Target: chain::Watch<Signer>,
         T::Target: BroadcasterInterface,
@@ -852,11 +1068,15 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                claimable_htlcs: HashMap::new(),
                                pending_msg_events: Vec::new(),
                        }),
+                       pending_inbound_payments: Mutex::new(HashMap::new()),
+                       pending_outbound_payments: Mutex::new(HashSet::new()),
+
                        our_network_key: keys_manager.get_node_secret(),
                        our_network_pubkey: PublicKey::from_secret_key(&secp_ctx, &keys_manager.get_node_secret()),
                        secp_ctx,
 
                        last_node_announcement_serial: AtomicUsize::new(0),
+                       highest_seen_timestamp: AtomicUsize::new(0),
 
                        per_peer_state: RwLock::new(HashMap::new()),
 
@@ -898,7 +1118,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                let channel = Channel::new_outbound(&self.fee_estimator, &self.keys_manager, their_network_key, channel_value_satoshis, push_msat, user_id, config)?;
                let res = channel.get_open_channel(self.genesis_hash.clone());
 
-               let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier);
+               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
                // We want to make sure the lock is actually acquired by PersistenceNotifierGuard.
                debug_assert!(&self.total_consistency_lock.try_write().is_err());
 
@@ -929,6 +1149,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                let (inbound_capacity_msat, outbound_capacity_msat) = channel.get_inbound_outbound_available_balance_msat();
                                res.push(ChannelDetails {
                                        channel_id: (*channel_id).clone(),
+                                       funding_txo: channel.get_funding_txo(),
                                        short_channel_id: channel.get_short_channel_id(),
                                        remote_network_id: channel.get_counterparty_node_id(),
                                        counterparty_features: InitFeatures::empty(),
@@ -936,7 +1157,10 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                        inbound_capacity_msat,
                                        outbound_capacity_msat,
                                        user_id: channel.get_user_id(),
-                                       is_live: channel.is_live(),
+                                       is_outbound: channel.is_outbound(),
+                                       is_funding_locked: channel.is_usable(),
+                                       is_usable: channel.is_live(),
+                                       is_public: channel.should_announce(),
                                        counterparty_forwarding_info: channel.counterparty_forwarding_info(),
                                });
                        }
@@ -959,8 +1183,9 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
        /// Gets the list of usable channels, in random order. Useful as an argument to
        /// get_route to ensure non-announced channels are used.
        ///
-       /// These are guaranteed to have their is_live value set to true, see the documentation for
-       /// ChannelDetails::is_live for more info on exactly what the criteria are.
+       /// These are guaranteed to have their [`ChannelDetails::is_usable`] value set to true, see the
+       /// documentation for [`ChannelDetails::is_usable`] for more info on exactly what the criteria
+       /// are.
        pub fn list_usable_channels(&self) -> Vec<ChannelDetails> {
                // Note we use is_live here instead of usable which leads to somewhat confused
                // internal/external nomenclature, but that's ok cause that's probably what the user
@@ -974,7 +1199,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
        ///
        /// May generate a SendShutdown message event on success, which should be relayed.
        pub fn close_channel(&self, channel_id: &[u8; 32]) -> Result<(), APIError> {
-               let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier);
+               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
 
                let (mut failed_htlcs, chan_option) = {
                        let mut channel_state_lock = self.channel_state.lock().unwrap();
@@ -1064,7 +1289,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
        /// Force closes a channel, immediately broadcasting the latest local commitment transaction to
        /// the chain and rejecting new HTLCs on the given channel. Fails if channel_id is unknown to the manager.
        pub fn force_close_channel(&self, channel_id: &[u8; 32]) -> Result<(), APIError> {
-               let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier);
+               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
                match self.force_close_channel_with_peer(channel_id, None) {
                        Ok(counterparty_node_id) => {
                                self.channel_state.lock().unwrap().pending_msg_events.push(
@@ -1214,6 +1439,10 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                        msgs::OnionHopDataFormat::FinalNode { payment_data } => payment_data,
                                };
 
+                               if payment_data.is_none() {
+                                       return_err!("We require payment_secrets", 0x4000|0x2000|3, &[0;0]);
+                               }
+
                                // Note that we could obviously respond immediately with an update_fulfill_htlc
                                // message, however that would leak that we are the recipient of this payment, so
                                // instead we stay symmetric with the forwarding case, only responding (after a
@@ -1221,7 +1450,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
 
                                PendingHTLCStatus::Forward(PendingHTLCInfo {
                                        routing: PendingHTLCRouting::Receive {
-                                               payment_data,
+                                               payment_data: payment_data.unwrap(),
                                                incoming_cltv_expiry: msg.cltv_expiry,
                                        },
                                        payment_hash: msg.payment_hash.clone(),
@@ -1395,7 +1624,8 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
        pub(crate) fn send_payment_along_path(&self, path: &Vec<RouteHop>, payment_hash: &PaymentHash, payment_secret: &Option<PaymentSecret>, total_value: u64, cur_height: u32) -> Result<(), APIError> {
                log_trace!(self.logger, "Attempting to send payment for path with next hop {}", path.first().unwrap().short_channel_id);
                let prng_seed = self.keys_manager.get_secure_random_bytes();
-               let session_priv = SecretKey::from_slice(&self.keys_manager.get_secure_random_bytes()[..]).expect("RNG is busted");
+               let session_priv_bytes = self.keys_manager.get_secure_random_bytes();
+               let session_priv = SecretKey::from_slice(&session_priv_bytes[..]).expect("RNG is busted");
 
                let onion_keys = onion_utils::construct_onion_keys(&self.secp_ctx, &path, &session_priv)
                        .map_err(|_| APIError::RouteError{err: "Pubkey along hop was maliciously selected"})?;
@@ -1405,7 +1635,8 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                }
                let onion_packet = onion_utils::construct_onion_packet(onion_payloads, onion_keys, prng_seed, payment_hash);
 
-               let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier);
+               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
+               assert!(self.pending_outbound_payments.lock().unwrap().insert(session_priv_bytes));
 
                let err: Result<(), _> = loop {
                        let mut channel_lock = self.channel_state.lock().unwrap();
@@ -1563,61 +1794,14 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                }
        }
 
-       /// Call this upon creation of a funding transaction for the given channel.
-       ///
-       /// Returns an [`APIError::APIMisuseError`] if the funding_transaction spent non-SegWit outputs
-       /// or if no output was found which matches the parameters in [`Event::FundingGenerationReady`].
-       ///
-       /// Panics if a funding transaction has already been provided for this channel.
-       ///
-       /// May panic if the output found in the funding transaction is duplicative with some other
-       /// channel (note that this should be trivially prevented by using unique funding transaction
-       /// keys per-channel).
-       ///
-       /// Do NOT broadcast the funding transaction yourself. When we have safely received our
-       /// counterparty's signature the funding transaction will automatically be broadcast via the
-       /// [`BroadcasterInterface`] provided when this `ChannelManager` was constructed.
-       ///
-       /// Note that this includes RBF or similar transaction replacement strategies - lightning does
-       /// not currently support replacing a funding transaction on an existing channel. Instead,
-       /// create a new channel with a conflicting funding transaction.
-       pub fn funding_transaction_generated(&self, temporary_channel_id: &[u8; 32], funding_transaction: Transaction) -> Result<(), APIError> {
-               let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier);
-
-               for inp in funding_transaction.input.iter() {
-                       if inp.witness.is_empty() {
-                               return Err(APIError::APIMisuseError {
-                                       err: "Funding transaction must be fully signed and spend Segwit outputs".to_owned()
-                               });
-                       }
-               }
-
+       /// Handles the generation of a funding transaction, optionally (for tests) with a function
+       /// which checks the correctness of the funding transaction given the associated channel.
+       fn funding_transaction_generated_intern<FundingOutput: Fn(&Channel<Signer>, &Transaction) -> Result<OutPoint, APIError>>
+                       (&self, temporary_channel_id: &[u8; 32], funding_transaction: Transaction, find_funding_output: FundingOutput) -> Result<(), APIError> {
                let (chan, msg) = {
                        let (res, chan) = match self.channel_state.lock().unwrap().by_id.remove(temporary_channel_id) {
                                Some(mut chan) => {
-                                       let mut output_index = None;
-                                       let expected_spk = chan.get_funding_redeemscript().to_v0_p2wsh();
-                                       for (idx, outp) in funding_transaction.output.iter().enumerate() {
-                                               if outp.script_pubkey == expected_spk && outp.value == chan.get_value_satoshis() {
-                                                       if output_index.is_some() {
-                                                               return Err(APIError::APIMisuseError {
-                                                                       err: "Multiple outputs matched the expected script and value".to_owned()
-                                                               });
-                                                       }
-                                                       if idx > u16::max_value() as usize {
-                                                               return Err(APIError::APIMisuseError {
-                                                                       err: "Transaction had more than 2^16 outputs, which is not supported".to_owned()
-                                                               });
-                                                       }
-                                                       output_index = Some(idx as u16);
-                                               }
-                                       }
-                                       if output_index.is_none() {
-                                               return Err(APIError::APIMisuseError {
-                                                       err: "No output matched the script_pubkey and value in the FundingGenerationReady event".to_owned()
-                                               });
-                                       }
-                                       let funding_txo = OutPoint { txid: funding_transaction.txid(), index: output_index.unwrap() };
+                                       let funding_txo = find_funding_output(&chan, &funding_transaction)?;
 
                                        (chan.get_outbound_funding_created(funding_transaction, funding_txo, &self.logger)
                                                .map_err(|e| if let ChannelError::Close(msg) = e {
@@ -1653,6 +1837,70 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                Ok(())
        }
 
+       #[cfg(test)]
+       pub(crate) fn funding_transaction_generated_unchecked(&self, temporary_channel_id: &[u8; 32], funding_transaction: Transaction, output_index: u16) -> Result<(), APIError> {
+               self.funding_transaction_generated_intern(temporary_channel_id, funding_transaction, |_, tx| {
+                       Ok(OutPoint { txid: tx.txid(), index: output_index })
+               })
+       }
+
+       /// Call this upon creation of a funding transaction for the given channel.
+       ///
+       /// Returns an [`APIError::APIMisuseError`] if the funding_transaction spent non-SegWit outputs
+       /// or if no output was found which matches the parameters in [`Event::FundingGenerationReady`].
+       ///
+       /// Panics if a funding transaction has already been provided for this channel.
+       ///
+       /// May panic if the output found in the funding transaction is duplicative with some other
+       /// channel (note that this should be trivially prevented by using unique funding transaction
+       /// keys per-channel).
+       ///
+       /// Do NOT broadcast the funding transaction yourself. When we have safely received our
+       /// counterparty's signature the funding transaction will automatically be broadcast via the
+       /// [`BroadcasterInterface`] provided when this `ChannelManager` was constructed.
+       ///
+       /// Note that this includes RBF or similar transaction replacement strategies - lightning does
+       /// not currently support replacing a funding transaction on an existing channel. Instead,
+       /// create a new channel with a conflicting funding transaction.
+       ///
+       /// [`Event::FundingGenerationReady`]: crate::util::events::Event::FundingGenerationReady
+       pub fn funding_transaction_generated(&self, temporary_channel_id: &[u8; 32], funding_transaction: Transaction) -> Result<(), APIError> {
+               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
+
+               for inp in funding_transaction.input.iter() {
+                       if inp.witness.is_empty() {
+                               return Err(APIError::APIMisuseError {
+                                       err: "Funding transaction must be fully signed and spend Segwit outputs".to_owned()
+                               });
+                       }
+               }
+               self.funding_transaction_generated_intern(temporary_channel_id, funding_transaction, |chan, tx| {
+                       let mut output_index = None;
+                       let expected_spk = chan.get_funding_redeemscript().to_v0_p2wsh();
+                       for (idx, outp) in tx.output.iter().enumerate() {
+                               if outp.script_pubkey == expected_spk && outp.value == chan.get_value_satoshis() {
+                                       if output_index.is_some() {
+                                               return Err(APIError::APIMisuseError {
+                                                       err: "Multiple outputs matched the expected script and value".to_owned()
+                                               });
+                                       }
+                                       if idx > u16::max_value() as usize {
+                                               return Err(APIError::APIMisuseError {
+                                                       err: "Transaction had more than 2^16 outputs, which is not supported".to_owned()
+                                               });
+                                       }
+                                       output_index = Some(idx as u16);
+                               }
+                       }
+                       if output_index.is_none() {
+                               return Err(APIError::APIMisuseError {
+                                       err: "No output matched the script_pubkey and value in the FundingGenerationReady event".to_owned()
+                               });
+                       }
+                       Ok(OutPoint { txid: tx.txid(), index: output_index.unwrap() })
+               })
+       }
+
        fn get_announcement_sigs(&self, chan: &Channel<Signer>) -> Option<msgs::AnnouncementSignatures> {
                if !chan.should_announce() {
                        log_trace!(self.logger, "Can't send announcement_signatures for private channel {}", log_bytes!(chan.channel_id()));
@@ -1679,33 +1927,42 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
        // be absurd. We ensure this by checking that at least 500 (our stated public contract on when
        // broadcast_node_announcement panics) of the maximum-length addresses would fit in a 64KB
        // message...
-       const HALF_MESSAGE_IS_ADDRS: u32 = ::std::u16::MAX as u32 / (NetAddress::MAX_LEN as u32 + 1) / 2;
+       const HALF_MESSAGE_IS_ADDRS: u32 = ::core::u16::MAX as u32 / (NetAddress::MAX_LEN as u32 + 1) / 2;
        #[deny(const_err)]
        #[allow(dead_code)]
        // ...by failing to compile if the number of addresses that would be half of a message is
        // smaller than 500:
        const STATIC_ASSERT: u32 = Self::HALF_MESSAGE_IS_ADDRS - 500;
 
-       /// Generates a signed node_announcement from the given arguments and creates a
-       /// BroadcastNodeAnnouncement event. Note that such messages will be ignored unless peers have
-       /// seen a channel_announcement from us (ie unless we have public channels open).
+       /// Regenerates channel_announcements and generates a signed node_announcement from the given
+       /// arguments, providing them in corresponding events via
+       /// [`get_and_clear_pending_msg_events`], if at least one public channel has been confirmed
+       /// on-chain. This effectively re-broadcasts all channel announcements and sends our node
+       /// announcement to ensure that the lightning P2P network is aware of the channels we have and
+       /// our network addresses.
        ///
-       /// RGB is a node "color" and alias is a printable human-readable string to describe this node
-       /// to humans. They carry no in-protocol meaning.
+       /// `rgb` is a node "color" and `alias` is a printable human-readable string to describe this
+       /// node to humans. They carry no in-protocol meaning.
        ///
-       /// addresses represent the set (possibly empty) of socket addresses on which this node accepts
-       /// incoming connections. These will be broadcast to the network, publicly tying these
-       /// addresses together. If you wish to preserve user privacy, addresses should likely contain
-       /// only Tor Onion addresses.
+       /// `addresses` represent the set (possibly empty) of socket addresses on which this node
+       /// accepts incoming connections. These will be included in the node_announcement, publicly
+       /// tying these addresses together and to this node. If you wish to preserve user privacy,
+       /// addresses should likely contain only Tor Onion addresses.
        ///
-       /// Panics if addresses is absurdly large (more than 500).
-       pub fn broadcast_node_announcement(&self, rgb: [u8; 3], alias: [u8; 32], addresses: Vec<NetAddress>) {
-               let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier);
+       /// Panics if `addresses` is absurdly large (more than 500).
+       ///
+       /// [`get_and_clear_pending_msg_events`]: MessageSendEventsProvider::get_and_clear_pending_msg_events
+       pub fn broadcast_node_announcement(&self, rgb: [u8; 3], alias: [u8; 32], mut addresses: Vec<NetAddress>) {
+               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
 
                if addresses.len() > 500 {
                        panic!("More than half the message size was taken up by public addresses!");
                }
 
+               // While all existing nodes handle unsorted addresses just fine, the spec requires that
+               // addresses be sorted for future compatibility.
+               addresses.sort_by_key(|addr| addr.get_id());
+
                let announcement = msgs::UnsignedNodeAnnouncement {
                        features: NodeFeatures::known(),
                        timestamp: self.last_node_announcement_serial.fetch_add(1, Ordering::AcqRel) as u32,
@@ -1715,14 +1972,37 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                        excess_data: Vec::new(),
                };
                let msghash = hash_to_message!(&Sha256dHash::hash(&announcement.encode()[..])[..]);
+               let node_announce_sig = self.secp_ctx.sign(&msghash, &self.our_network_key);
 
-               let mut channel_state = self.channel_state.lock().unwrap();
-               channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastNodeAnnouncement {
-                       msg: msgs::NodeAnnouncement {
-                               signature: self.secp_ctx.sign(&msghash, &self.our_network_key),
-                               contents: announcement
-                       },
-               });
+               let mut channel_state_lock = self.channel_state.lock().unwrap();
+               let channel_state = &mut *channel_state_lock;
+
+               let mut announced_chans = false;
+               for (_, chan) in channel_state.by_id.iter() {
+                       if let Some(msg) = chan.get_signed_channel_announcement(&self.our_network_key, self.get_our_node_id(), self.genesis_hash.clone()) {
+                               channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelAnnouncement {
+                                       msg,
+                                       update_msg: match self.get_channel_update(chan) {
+                                               Ok(msg) => msg,
+                                               Err(_) => continue,
+                                       },
+                               });
+                               announced_chans = true;
+                       } else {
+                               // If the channel is not public or has not yet reached funding_locked, check the
+                               // next channel. If we don't yet have any public channels, we'll skip the broadcast
+                               // below as peers may not accept it without channels on chain first.
+                       }
+               }
+
+               if announced_chans {
+                       channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastNodeAnnouncement {
+                               msg: msgs::NodeAnnouncement {
+                                       signature: node_announce_sig,
+                                       contents: announcement
+                               },
+                       });
+               }
        }
 
        /// Processes HTLCs which are pending waiting on random forward delay.
@@ -1730,7 +2010,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
        /// Should only really ever be called in response to a PendingHTLCsForwardable event.
        /// Will likely generate further events.
        pub fn process_pending_htlc_forwards(&self) {
-               let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier);
+               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
 
                let mut new_events = Vec::new();
                let mut failed_forwards = Vec::new();
@@ -1821,7 +2101,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                                                },
                                                                HTLCForwardInfo::FailHTLC { htlc_id, err_packet } => {
                                                                        log_trace!(self.logger, "Failing HTLC back to channel with short id {} after delay", short_chan_id);
-                                                                       match chan.get_mut().get_update_fail_htlc(htlc_id, err_packet) {
+                                                                       match chan.get_mut().get_update_fail_htlc(htlc_id, err_packet, &self.logger) {
                                                                                Err(e) => {
                                                                                        if let ChannelError::Ignore(msg) = e {
                                                                                                log_trace!(self.logger, "Failed to fail backwards to short_id {}: {}", short_chan_id, msg);
@@ -1900,61 +2180,95 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                                                        routing: PendingHTLCRouting::Receive { payment_data, incoming_cltv_expiry },
                                                                        incoming_shared_secret, payment_hash, amt_to_forward, .. },
                                                                        prev_funding_outpoint } => {
-                                                               let prev_hop = HTLCPreviousHopData {
-                                                                       short_channel_id: prev_short_channel_id,
-                                                                       outpoint: prev_funding_outpoint,
-                                                                       htlc_id: prev_htlc_id,
-                                                                       incoming_packet_shared_secret: incoming_shared_secret,
-                                                               };
-
-                                                               let mut total_value = 0;
-                                                               let payment_secret_opt =
-                                                                       if let &Some(ref data) = &payment_data { Some(data.payment_secret.clone()) } else { None };
-                                                               let htlcs = channel_state.claimable_htlcs.entry((payment_hash, payment_secret_opt))
-                                                                       .or_insert(Vec::new());
-                                                               htlcs.push(ClaimableHTLC {
-                                                                       prev_hop,
+                                                               let claimable_htlc = ClaimableHTLC {
+                                                                       prev_hop: HTLCPreviousHopData {
+                                                                               short_channel_id: prev_short_channel_id,
+                                                                               outpoint: prev_funding_outpoint,
+                                                                               htlc_id: prev_htlc_id,
+                                                                               incoming_packet_shared_secret: incoming_shared_secret,
+                                                                       },
                                                                        value: amt_to_forward,
                                                                        payment_data: payment_data.clone(),
                                                                        cltv_expiry: incoming_cltv_expiry,
-                                                               });
-                                                               if let &Some(ref data) = &payment_data {
-                                                                       for htlc in htlcs.iter() {
-                                                                               total_value += htlc.value;
-                                                                               if htlc.payment_data.as_ref().unwrap().total_msat != data.total_msat {
-                                                                                       total_value = msgs::MAX_VALUE_MSAT;
-                                                                               }
-                                                                               if total_value >= msgs::MAX_VALUE_MSAT { break; }
-                                                                       }
-                                                                       if total_value >= msgs::MAX_VALUE_MSAT || total_value > data.total_msat  {
-                                                                               for htlc in htlcs.iter() {
-                                                                                       let mut htlc_msat_height_data = byte_utils::be64_to_array(htlc.value).to_vec();
-                                                                                       htlc_msat_height_data.extend_from_slice(
-                                                                                               &byte_utils::be32_to_array(self.best_block.read().unwrap().height()),
-                                                                                       );
-                                                                                       failed_forwards.push((HTLCSource::PreviousHopData(HTLCPreviousHopData {
-                                                                                                       short_channel_id: htlc.prev_hop.short_channel_id,
-                                                                                                       outpoint: prev_funding_outpoint,
-                                                                                                       htlc_id: htlc.prev_hop.htlc_id,
-                                                                                                       incoming_packet_shared_secret: htlc.prev_hop.incoming_packet_shared_secret,
-                                                                                               }), payment_hash,
-                                                                                               HTLCFailReason::Reason { failure_code: 0x4000 | 15, data: htlc_msat_height_data }
-                                                                                       ));
-                                                                               }
-                                                                       } else if total_value == data.total_msat {
-                                                                               new_events.push(events::Event::PaymentReceived {
-                                                                                       payment_hash,
-                                                                                       payment_secret: Some(data.payment_secret),
-                                                                                       amt: total_value,
-                                                                               });
+                                                               };
+
+                                                               macro_rules! fail_htlc {
+                                                                       ($htlc: expr) => {
+                                                                               let mut htlc_msat_height_data = byte_utils::be64_to_array($htlc.value).to_vec();
+                                                                               htlc_msat_height_data.extend_from_slice(
+                                                                                       &byte_utils::be32_to_array(self.best_block.read().unwrap().height()),
+                                                                               );
+                                                                               failed_forwards.push((HTLCSource::PreviousHopData(HTLCPreviousHopData {
+                                                                                               short_channel_id: $htlc.prev_hop.short_channel_id,
+                                                                                               outpoint: prev_funding_outpoint,
+                                                                                               htlc_id: $htlc.prev_hop.htlc_id,
+                                                                                               incoming_packet_shared_secret: $htlc.prev_hop.incoming_packet_shared_secret,
+                                                                                       }), payment_hash,
+                                                                                       HTLCFailReason::Reason { failure_code: 0x4000 | 15, data: htlc_msat_height_data }
+                                                                               ));
                                                                        }
-                                                               } else {
-                                                                       new_events.push(events::Event::PaymentReceived {
-                                                                               payment_hash,
-                                                                               payment_secret: None,
-                                                                               amt: amt_to_forward,
-                                                                       });
                                                                }
+
+                                                               // Check that the payment hash and secret are known. Note that we
+                                                               // MUST take care to handle the "unknown payment hash" and
+                                                               // "incorrect payment secret" cases here identically or we'd expose
+                                                               // that we are the ultimate recipient of the given payment hash.
+                                                               // Further, we must not expose whether we have any other HTLCs
+                                                               // associated with the same payment_hash pending or not.
+                                                               let mut payment_secrets = self.pending_inbound_payments.lock().unwrap();
+                                                               match payment_secrets.entry(payment_hash) {
+                                                                       hash_map::Entry::Vacant(_) => {
+                                                                               log_trace!(self.logger, "Failing new HTLC with payment_hash {} as we didn't have a corresponding inbound payment.", log_bytes!(payment_hash.0));
+                                                                               fail_htlc!(claimable_htlc);
+                                                                       },
+                                                                       hash_map::Entry::Occupied(inbound_payment) => {
+                                                                               if inbound_payment.get().payment_secret != payment_data.payment_secret {
+                                                                                       log_trace!(self.logger, "Failing new HTLC with payment_hash {} as it didn't match our expected payment secret.", log_bytes!(payment_hash.0));
+                                                                                       fail_htlc!(claimable_htlc);
+                                                                               } else if inbound_payment.get().min_value_msat.is_some() && payment_data.total_msat < inbound_payment.get().min_value_msat.unwrap() {
+                                                                                       log_trace!(self.logger, "Failing new HTLC with payment_hash {} as it didn't match our minimum value (had {}, needed {}).",
+                                                                                               log_bytes!(payment_hash.0), payment_data.total_msat, inbound_payment.get().min_value_msat.unwrap());
+                                                                                       fail_htlc!(claimable_htlc);
+                                                                               } else {
+                                                                                       let mut total_value = 0;
+                                                                                       let htlcs = channel_state.claimable_htlcs.entry(payment_hash)
+                                                                                               .or_insert(Vec::new());
+                                                                                       htlcs.push(claimable_htlc);
+                                                                                       for htlc in htlcs.iter() {
+                                                                                               total_value += htlc.value;
+                                                                                               if htlc.payment_data.total_msat != payment_data.total_msat {
+                                                                                                       log_trace!(self.logger, "Failing HTLCs with payment_hash {} as the HTLCs had inconsistent total values (eg {} and {})",
+                                                                                                               log_bytes!(payment_hash.0), payment_data.total_msat, htlc.payment_data.total_msat);
+                                                                                                       total_value = msgs::MAX_VALUE_MSAT;
+                                                                                               }
+                                                                                               if total_value >= msgs::MAX_VALUE_MSAT { break; }
+                                                                                       }
+                                                                                       if total_value >= msgs::MAX_VALUE_MSAT || total_value > payment_data.total_msat {
+                                                                                               log_trace!(self.logger, "Failing HTLCs with payment_hash {} as the total value {} ran over expected value {} (or HTLCs were inconsistent)",
+                                                                                                       log_bytes!(payment_hash.0), total_value, payment_data.total_msat);
+                                                                                               for htlc in htlcs.iter() {
+                                                                                                       fail_htlc!(htlc);
+                                                                                               }
+                                                                                       } else if total_value == payment_data.total_msat {
+                                                                                               new_events.push(events::Event::PaymentReceived {
+                                                                                                       payment_hash,
+                                                                                                       payment_preimage: inbound_payment.get().payment_preimage,
+                                                                                                       payment_secret: payment_data.payment_secret,
+                                                                                                       amt: total_value,
+                                                                                                       user_payment_id: inbound_payment.get().user_payment_id,
+                                                                                               });
+                                                                                               // Only ever generate at most one PaymentReceived
+                                                                                               // per registered payment_hash, even if it isn't
+                                                                                               // claimed.
+                                                                                               inbound_payment.remove_entry();
+                                                                                       } else {
+                                                                                               // Nothing to do - we haven't reached the total
+                                                                                               // payment value yet, wait until we receive more
+                                                                                               // MPP parts.
+                                                                                       }
+                                                                               }
+                                                                       },
+                                                               };
                                                        },
                                                        HTLCForwardInfo::AddHTLC { .. } => {
                                                                panic!("short_channel_id == 0 should imply any pending_forward entries are of type Receive");
@@ -1987,9 +2301,13 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
        /// BroadcastChannelUpdate events in timer_tick_occurred.
        ///
        /// Expects the caller to have a total_consistency_lock read lock.
-       fn process_background_events(&self) {
+       fn process_background_events(&self) -> bool {
                let mut background_events = Vec::new();
                mem::swap(&mut *self.pending_background_events.lock().unwrap(), &mut background_events);
+               if background_events.is_empty() {
+                       return false;
+               }
+
                for event in background_events.drain(..) {
                        match event {
                                BackgroundEvent::ClosingMonitorUpdate((funding_txo, update)) => {
@@ -1999,10 +2317,12 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                },
                        }
                }
+               true
        }
 
        #[cfg(any(test, feature = "_test_utils"))]
-       pub(crate) fn test_process_background_events(&self) {
+       /// Process background events, for functional testing
+       pub fn test_process_background_events(&self) {
                self.process_background_events();
        }
 
@@ -2014,25 +2334,42 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
        ///
        /// Note that in some rare cases this may generate a `chain::Watch::update_channel` call.
        pub fn timer_tick_occurred(&self) {
-               let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier);
-               self.process_background_events();
+               PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || {
+                       let mut should_persist = NotifyOption::SkipPersist;
+                       if self.process_background_events() { should_persist = NotifyOption::DoPersist; }
 
-               let mut channel_state_lock = self.channel_state.lock().unwrap();
-               let channel_state = &mut *channel_state_lock;
-               for (_, chan) in channel_state.by_id.iter_mut() {
-                       if chan.is_disabled_staged() && !chan.is_live() {
-                               if let Ok(update) = self.get_channel_update(&chan) {
-                                       channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
-                                               msg: update
-                                       });
+                       let mut channel_state_lock = self.channel_state.lock().unwrap();
+                       let channel_state = &mut *channel_state_lock;
+                       for (_, chan) in channel_state.by_id.iter_mut() {
+                               match chan.channel_update_status() {
+                                       ChannelUpdateStatus::Enabled if !chan.is_live() => chan.set_channel_update_status(ChannelUpdateStatus::DisabledStaged),
+                                       ChannelUpdateStatus::Disabled if chan.is_live() => chan.set_channel_update_status(ChannelUpdateStatus::EnabledStaged),
+                                       ChannelUpdateStatus::DisabledStaged if chan.is_live() => chan.set_channel_update_status(ChannelUpdateStatus::Enabled),
+                                       ChannelUpdateStatus::EnabledStaged if !chan.is_live() => chan.set_channel_update_status(ChannelUpdateStatus::Disabled),
+                                       ChannelUpdateStatus::DisabledStaged if !chan.is_live() => {
+                                               if let Ok(update) = self.get_channel_update(&chan) {
+                                                       channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
+                                                               msg: update
+                                                       });
+                                               }
+                                               should_persist = NotifyOption::DoPersist;
+                                               chan.set_channel_update_status(ChannelUpdateStatus::Disabled);
+                                       },
+                                       ChannelUpdateStatus::EnabledStaged if chan.is_live() => {
+                                               if let Ok(update) = self.get_channel_update(&chan) {
+                                                       channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
+                                                               msg: update
+                                                       });
+                                               }
+                                               should_persist = NotifyOption::DoPersist;
+                                               chan.set_channel_update_status(ChannelUpdateStatus::Enabled);
+                                       },
+                                       _ => {},
                                }
-                               chan.to_fresh();
-                       } else if chan.is_disabled_staged() && chan.is_live() {
-                               chan.to_fresh();
-                       } else if chan.is_disabled_marked() {
-                               chan.to_disabled_staged();
                        }
-               }
+
+                       should_persist
+               });
        }
 
        /// Indicates that the preimage for payment_hash is unknown or the received amount is incorrect
@@ -2040,11 +2377,11 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
        /// along the path (including in our own channel on which we received it).
        /// Returns false if no payment was found to fail backwards, true if the process of failing the
        /// HTLC backwards has been started.
-       pub fn fail_htlc_backwards(&self, payment_hash: &PaymentHash, payment_secret: &Option<PaymentSecret>) -> bool {
-               let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier);
+       pub fn fail_htlc_backwards(&self, payment_hash: &PaymentHash) -> bool {
+               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
 
                let mut channel_state = Some(self.channel_state.lock().unwrap());
-               let removed_source = channel_state.as_mut().unwrap().claimable_htlcs.remove(&(*payment_hash, *payment_secret));
+               let removed_source = channel_state.as_mut().unwrap().claimable_htlcs.remove(payment_hash);
                if let Some(mut sources) = removed_source {
                        for htlc in sources.drain(..) {
                                if channel_state.is_none() { channel_state = Some(self.channel_state.lock().unwrap()); }
@@ -2081,17 +2418,25 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                        self.fail_htlc_backwards_internal(channel_state,
                                                htlc_src, &payment_hash, HTLCFailReason::Reason { failure_code, data: onion_failure_data});
                                },
-                               HTLCSource::OutboundRoute { .. } => {
-                                       self.pending_events.lock().unwrap().push(
-                                               events::Event::PaymentFailed {
-                                                       payment_hash,
-                                                       rejected_by_dest: false,
+                               HTLCSource::OutboundRoute { session_priv, .. } => {
+                                       if {
+                                               let mut session_priv_bytes = [0; 32];
+                                               session_priv_bytes.copy_from_slice(&session_priv[..]);
+                                               self.pending_outbound_payments.lock().unwrap().remove(&session_priv_bytes)
+                                       } {
+                                               self.pending_events.lock().unwrap().push(
+                                                       events::Event::PaymentFailed {
+                                                               payment_hash,
+                                                               rejected_by_dest: false,
 #[cfg(test)]
-                                                       error_code: None,
+                                                               error_code: None,
 #[cfg(test)]
-                                                       error_data: None,
-                                               }
-                                       )
+                                                               error_data: None,
+                                                       }
+                                               )
+                                       } else {
+                                               log_trace!(self.logger, "Received duplicative fail for HTLC with payment_hash {}", log_bytes!(payment_hash.0));
+                                       }
                                },
                        };
                }
@@ -2113,7 +2458,15 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                // from block_connected which may run during initialization prior to the chain_monitor
                // being fully configured. See the docs for `ChannelManagerReadArgs` for more.
                match source {
-                       HTLCSource::OutboundRoute { ref path, .. } => {
+                       HTLCSource::OutboundRoute { ref path, session_priv, .. } => {
+                               if {
+                                       let mut session_priv_bytes = [0; 32];
+                                       session_priv_bytes.copy_from_slice(&session_priv[..]);
+                                       !self.pending_outbound_payments.lock().unwrap().remove(&session_priv_bytes)
+                               } {
+                                       log_trace!(self.logger, "Received duplicative fail for HTLC with payment_hash {}", log_bytes!(payment_hash.0));
+                                       return;
+                               }
                                log_trace!(self.logger, "Failing outbound payment HTLC with payment_hash {}", log_bytes!(payment_hash.0));
                                mem::drop(channel_state_lock);
                                match &onion_error {
@@ -2209,24 +2562,22 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
        /// generating message events for the net layer to claim the payment, if possible. Thus, you
        /// should probably kick the net layer to go send messages if this returns true!
        ///
-       /// You must specify the expected amounts for this HTLC, and we will only claim HTLCs
-       /// available within a few percent of the expected amount. This is critical for several
-       /// reasons : a) it avoids providing senders with `proof-of-payment` (in the form of the
-       /// payment_preimage without having provided the full value and b) it avoids certain
-       /// privacy-breaking recipient-probing attacks which may reveal payment activity to
-       /// motivated attackers.
-       ///
-       /// Note that the privacy concerns in (b) are not relevant in payments with a payment_secret
-       /// set. Thus, for such payments we will claim any payments which do not under-pay.
+       /// Note that if you did not set an `amount_msat` when calling [`create_inbound_payment`] or
+       /// [`create_inbound_payment_for_hash`] you must check that the amount in the `PaymentReceived`
+       /// event matches your expectation. If you fail to do so and call this method, you may provide
+       /// the sender "proof-of-payment" when they did not fulfill the full expected payment.
        ///
        /// May panic if called except in response to a PaymentReceived event.
-       pub fn claim_funds(&self, payment_preimage: PaymentPreimage, payment_secret: &Option<PaymentSecret>, expected_amount: u64) -> bool {
+       ///
+       /// [`create_inbound_payment`]: Self::create_inbound_payment
+       /// [`create_inbound_payment_for_hash`]: Self::create_inbound_payment_for_hash
+       pub fn claim_funds(&self, payment_preimage: PaymentPreimage) -> bool {
                let payment_hash = PaymentHash(Sha256::hash(&payment_preimage.0).into_inner());
 
-               let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier);
+               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
 
                let mut channel_state = Some(self.channel_state.lock().unwrap());
-               let removed_source = channel_state.as_mut().unwrap().claimable_htlcs.remove(&(payment_hash, *payment_secret));
+               let removed_source = channel_state.as_mut().unwrap().claimable_htlcs.remove(&payment_hash);
                if let Some(mut sources) = removed_source {
                        assert!(!sources.is_empty());
 
@@ -2241,27 +2592,19 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                        // we got all the HTLCs and then a channel closed while we were waiting for the user to
                        // provide the preimage, so worrying too much about the optimal handling isn't worth
                        // it.
-
-                       let (is_mpp, mut valid_mpp) = if let &Some(ref data) = &sources[0].payment_data {
-                               assert!(payment_secret.is_some());
-                               (true, data.total_msat >= expected_amount)
-                       } else {
-                               assert!(payment_secret.is_none());
-                               (false, false)
-                       };
-
+                       let mut valid_mpp = true;
                        for htlc in sources.iter() {
-                               if !is_mpp || !valid_mpp { break; }
                                if let None = channel_state.as_ref().unwrap().short_to_id.get(&htlc.prev_hop.short_channel_id) {
                                        valid_mpp = false;
+                                       break;
                                }
                        }
 
                        let mut errs = Vec::new();
                        let mut claimed_any_htlcs = false;
                        for htlc in sources.drain(..) {
-                               if channel_state.is_none() { channel_state = Some(self.channel_state.lock().unwrap()); }
-                               if (is_mpp && !valid_mpp) || (!is_mpp && (htlc.value < expected_amount || htlc.value > expected_amount * 2)) {
+                               if !valid_mpp {
+                                       if channel_state.is_none() { channel_state = Some(self.channel_state.lock().unwrap()); }
                                        let mut htlc_msat_height_data = byte_utils::be64_to_array(htlc.value).to_vec();
                                        htlc_msat_height_data.extend_from_slice(&byte_utils::be32_to_array(
                                                        self.best_block.read().unwrap().height()));
@@ -2278,10 +2621,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                                                claimed_any_htlcs = true;
                                                        } else { errs.push(e); }
                                                },
-                                               Err(None) if is_mpp => unreachable!("We already checked for channel existence, we can't fail here!"),
-                                               Err(None) => {
-                                                       log_warn!(self.logger, "Channel we expected to claim an HTLC from was closed.");
-                                               },
+                                               Err(None) => unreachable!("We already checked for channel existence, we can't fail here!"),
                                                Ok(()) => claimed_any_htlcs = true,
                                        }
                                }
@@ -2355,12 +2695,20 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
 
        fn claim_funds_internal(&self, mut channel_state_lock: MutexGuard<ChannelHolder<Signer>>, source: HTLCSource, payment_preimage: PaymentPreimage) {
                match source {
-                       HTLCSource::OutboundRoute { .. } => {
+                       HTLCSource::OutboundRoute { session_priv, .. } => {
                                mem::drop(channel_state_lock);
-                               let mut pending_events = self.pending_events.lock().unwrap();
-                               pending_events.push(events::Event::PaymentSent {
-                                       payment_preimage
-                               });
+                               if {
+                                       let mut session_priv_bytes = [0; 32];
+                                       session_priv_bytes.copy_from_slice(&session_priv[..]);
+                                       self.pending_outbound_payments.lock().unwrap().remove(&session_priv_bytes)
+                               } {
+                                       let mut pending_events = self.pending_events.lock().unwrap();
+                                       pending_events.push(events::Event::PaymentSent {
+                                               payment_preimage
+                                       });
+                               } else {
+                                       log_trace!(self.logger, "Received duplicative fulfill for HTLC with payment_preimage {}", log_bytes!(payment_preimage.0));
+                               }
                        },
                        HTLCSource::PreviousHopData(hop_data) => {
                                let prev_outpoint = hop_data.outpoint;
@@ -2418,86 +2766,26 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
        ///  4) once all remote copies are updated, you call this function with the update_id that
        ///     completed, and once it is the latest the Channel will be re-enabled.
        pub fn channel_monitor_updated(&self, funding_txo: &OutPoint, highest_applied_update_id: u64) {
-               let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier);
-
-               let mut close_results = Vec::new();
-               let mut htlc_forwards = Vec::new();
-               let mut htlc_failures = Vec::new();
-               let mut pending_events = Vec::new();
+               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
 
-               {
+               let (mut pending_failures, chan_restoration_res) = {
                        let mut channel_lock = self.channel_state.lock().unwrap();
                        let channel_state = &mut *channel_lock;
-                       let short_to_id = &mut channel_state.short_to_id;
-                       let pending_msg_events = &mut channel_state.pending_msg_events;
-                       let channel = match channel_state.by_id.get_mut(&funding_txo.to_channel_id()) {
-                               Some(chan) => chan,
-                               None => return,
+                       let mut channel = match channel_state.by_id.entry(funding_txo.to_channel_id()) {
+                               hash_map::Entry::Occupied(chan) => chan,
+                               hash_map::Entry::Vacant(_) => return,
                        };
-                       if !channel.is_awaiting_monitor_update() || channel.get_latest_monitor_update_id() != highest_applied_update_id {
+                       if !channel.get().is_awaiting_monitor_update() || channel.get().get_latest_monitor_update_id() != highest_applied_update_id {
                                return;
                        }
 
-                       let (raa, commitment_update, order, pending_forwards, mut pending_failures, funding_broadcastable, funding_locked) = channel.monitor_updating_restored(&self.logger);
-                       if !pending_forwards.is_empty() {
-                               htlc_forwards.push((channel.get_short_channel_id().expect("We can't have pending forwards before funding confirmation"), funding_txo.clone(), pending_forwards));
-                       }
-                       htlc_failures.append(&mut pending_failures);
-
-                       macro_rules! handle_cs { () => {
-                               if let Some(update) = commitment_update {
-                                       pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
-                                               node_id: channel.get_counterparty_node_id(),
-                                               updates: update,
-                                       });
-                               }
-                       } }
-                       macro_rules! handle_raa { () => {
-                               if let Some(revoke_and_ack) = raa {
-                                       pending_msg_events.push(events::MessageSendEvent::SendRevokeAndACK {
-                                               node_id: channel.get_counterparty_node_id(),
-                                               msg: revoke_and_ack,
-                                       });
-                               }
-                       } }
-                       match order {
-                               RAACommitmentOrder::CommitmentFirst => {
-                                       handle_cs!();
-                                       handle_raa!();
-                               },
-                               RAACommitmentOrder::RevokeAndACKFirst => {
-                                       handle_raa!();
-                                       handle_cs!();
-                               },
-                       }
-                       if let Some(tx) = funding_broadcastable {
-                               self.tx_broadcaster.broadcast_transaction(&tx);
-                       }
-                       if let Some(msg) = funding_locked {
-                               pending_msg_events.push(events::MessageSendEvent::SendFundingLocked {
-                                       node_id: channel.get_counterparty_node_id(),
-                                       msg,
-                               });
-                               if let Some(announcement_sigs) = self.get_announcement_sigs(channel) {
-                                       pending_msg_events.push(events::MessageSendEvent::SendAnnouncementSignatures {
-                                               node_id: channel.get_counterparty_node_id(),
-                                               msg: announcement_sigs,
-                                       });
-                               }
-                               short_to_id.insert(channel.get_short_channel_id().unwrap(), channel.channel_id());
-                       }
-               }
-
-               self.pending_events.lock().unwrap().append(&mut pending_events);
-
-               for failure in htlc_failures.drain(..) {
+                       let (raa, commitment_update, order, pending_forwards, pending_failures, funding_broadcastable, funding_locked) = channel.get_mut().monitor_updating_restored(&self.logger);
+                       (pending_failures, handle_chan_restoration_locked!(self, channel_lock, channel_state, channel, raa, commitment_update, order, None, pending_forwards, funding_broadcastable, funding_locked))
+               };
+               post_handle_chan_restoration!(self, chan_restoration_res);
+               for failure in pending_failures.drain(..) {
                        self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), failure.0, &failure.1, failure.2);
                }
-               self.forward_htlcs(&mut htlc_forwards[..]);
-
-               for res in close_results.drain(..) {
-                       self.finish_force_close_channel(res);
-               }
        }
 
        fn internal_open_channel(&self, counterparty_node_id: &PublicKey, their_features: InitFeatures, msg: &msgs::OpenChannel) -> Result<(), MsgHandleErrInternal> {
@@ -2626,6 +2914,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id))
                        }
                };
+               log_info!(self.logger, "Broadcasting funding transaction with txid {}", funding_tx.txid());
                self.tx_broadcaster.broadcast_transaction(&funding_tx);
                Ok(())
        }
@@ -2740,7 +3029,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                        }
                };
                if let Some(broadcast_tx) = tx {
-                       log_trace!(self.logger, "Broadcast onchain {}", log_tx!(broadcast_tx));
+                       log_info!(self.logger, "Broadcasting {}", log_tx!(broadcast_tx));
                        self.tx_broadcaster.broadcast_transaction(&broadcast_tx);
                }
                if let Some(chan) = chan_option {
@@ -3042,39 +3331,8 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                        return Err(MsgHandleErrInternal::from_no_close(LightningError{err: "Got an announcement_signatures before we were ready for it".to_owned(), action: msgs::ErrorAction::IgnoreError}));
                                }
 
-                               let our_node_id = self.get_our_node_id();
-                               let (announcement, our_bitcoin_sig) =
-                                       try_chan_entry!(self, chan.get_mut().get_channel_announcement(our_node_id.clone(), self.genesis_hash.clone()), channel_state, chan);
-
-                               let were_node_one = announcement.node_id_1 == our_node_id;
-                               let msghash = hash_to_message!(&Sha256dHash::hash(&announcement.encode()[..])[..]);
-                               {
-                                       let their_node_key = if were_node_one { &announcement.node_id_2 } else { &announcement.node_id_1 };
-                                       let their_bitcoin_key = if were_node_one { &announcement.bitcoin_key_2 } else { &announcement.bitcoin_key_1 };
-                                       match (self.secp_ctx.verify(&msghash, &msg.node_signature, their_node_key),
-                                                  self.secp_ctx.verify(&msghash, &msg.bitcoin_signature, their_bitcoin_key)) {
-                                               (Err(e), _) => {
-                                                       let chan_err: ChannelError = ChannelError::Close(format!("Bad announcement_signatures. Failed to verify node_signature: {:?}. Maybe using different node_secret for transport and routing msg? UnsignedChannelAnnouncement used for verification is {:?}. their_node_key is {:?}", e, &announcement, their_node_key));
-                                                       try_chan_entry!(self, Err(chan_err), channel_state, chan);
-                                               },
-                                               (_, Err(e)) => {
-                                                       let chan_err: ChannelError = ChannelError::Close(format!("Bad announcement_signatures. Failed to verify bitcoin_signature: {:?}. UnsignedChannelAnnouncement used for verification is {:?}. their_bitcoin_key is ({:?})", e, &announcement, their_bitcoin_key));
-                                                       try_chan_entry!(self, Err(chan_err), channel_state, chan);
-                                               },
-                                               _ => {}
-                                       }
-                               }
-
-                               let our_node_sig = self.secp_ctx.sign(&msghash, &self.our_network_key);
-
                                channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelAnnouncement {
-                                       msg: msgs::ChannelAnnouncement {
-                                               node_signature_1: if were_node_one { our_node_sig } else { msg.node_signature },
-                                               node_signature_2: if were_node_one { msg.node_signature } else { our_node_sig },
-                                               bitcoin_signature_1: if were_node_one { our_bitcoin_sig } else { msg.bitcoin_signature },
-                                               bitcoin_signature_2: if were_node_one { msg.bitcoin_signature } else { our_bitcoin_sig },
-                                               contents: announcement,
-                                       },
+                                       msg: try_chan_entry!(self, chan.get_mut().announcement_signatures(&self.our_network_key, self.get_our_node_id(), self.genesis_hash.clone(), msg), channel_state, chan),
                                        update_msg: self.get_channel_update(chan.get()).unwrap(), // can only fail if we're not in a ready state
                                });
                        },
@@ -3107,77 +3365,35 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
        }
 
        fn internal_channel_reestablish(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReestablish) -> Result<(), MsgHandleErrInternal> {
-               let mut channel_state_lock = self.channel_state.lock().unwrap();
-               let channel_state = &mut *channel_state_lock;
+               let (htlcs_failed_forward, chan_restoration_res) = {
+                       let mut channel_state_lock = self.channel_state.lock().unwrap();
+                       let channel_state = &mut *channel_state_lock;
 
-               match channel_state.by_id.entry(msg.channel_id) {
-                       hash_map::Entry::Occupied(mut chan) => {
-                               if chan.get().get_counterparty_node_id() != *counterparty_node_id {
-                                       return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id));
-                               }
-                               // Currently, we expect all holding cell update_adds to be dropped on peer
-                               // disconnect, so Channel's reestablish will never hand us any holding cell
-                               // freed HTLCs to fail backwards. If in the future we no longer drop pending
-                               // add-HTLCs on disconnect, we may be handed HTLCs to fail backwards here.
-                               let (funding_locked, revoke_and_ack, commitment_update, monitor_update_opt, mut order, shutdown) =
-                                       try_chan_entry!(self, chan.get_mut().channel_reestablish(msg, &self.logger), channel_state, chan);
-                               if let Some(monitor_update) = monitor_update_opt {
-                                       if let Err(e) = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), monitor_update) {
-                                               // channel_reestablish doesn't guarantee the order it returns is sensical
-                                               // for the messages it returns, but if we're setting what messages to
-                                               // re-transmit on monitor update success, we need to make sure it is sane.
-                                               if revoke_and_ack.is_none() {
-                                                       order = RAACommitmentOrder::CommitmentFirst;
-                                               }
-                                               if commitment_update.is_none() {
-                                                       order = RAACommitmentOrder::RevokeAndACKFirst;
-                                               }
-                                               return_monitor_err!(self, e, channel_state, chan, order, revoke_and_ack.is_some(), commitment_update.is_some());
-                                               //TODO: Resend the funding_locked if needed once we get the monitor running again
-                                       }
-                               }
-                               if let Some(msg) = funding_locked {
-                                       channel_state.pending_msg_events.push(events::MessageSendEvent::SendFundingLocked {
-                                               node_id: counterparty_node_id.clone(),
-                                               msg
-                                       });
-                               }
-                               macro_rules! send_raa { () => {
-                                       if let Some(msg) = revoke_and_ack {
-                                               channel_state.pending_msg_events.push(events::MessageSendEvent::SendRevokeAndACK {
-                                                       node_id: counterparty_node_id.clone(),
-                                                       msg
-                                               });
+                       match channel_state.by_id.entry(msg.channel_id) {
+                               hash_map::Entry::Occupied(mut chan) => {
+                                       if chan.get().get_counterparty_node_id() != *counterparty_node_id {
+                                               return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id));
                                        }
-                               } }
-                               macro_rules! send_cu { () => {
-                                       if let Some(updates) = commitment_update {
-                                               channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
+                                       // Currently, we expect all holding cell update_adds to be dropped on peer
+                                       // disconnect, so Channel's reestablish will never hand us any holding cell
+                                       // freed HTLCs to fail backwards. If in the future we no longer drop pending
+                                       // add-HTLCs on disconnect, we may be handed HTLCs to fail backwards here.
+                                       let (funding_locked, revoke_and_ack, commitment_update, monitor_update_opt, order, htlcs_failed_forward, shutdown) =
+                                               try_chan_entry!(self, chan.get_mut().channel_reestablish(msg, &self.logger), channel_state, chan);
+                                       if let Some(msg) = shutdown {
+                                               channel_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown {
                                                        node_id: counterparty_node_id.clone(),
-                                                       updates
+                                                       msg,
                                                });
                                        }
-                               } }
-                               match order {
-                                       RAACommitmentOrder::RevokeAndACKFirst => {
-                                               send_raa!();
-                                               send_cu!();
-                                       },
-                                       RAACommitmentOrder::CommitmentFirst => {
-                                               send_cu!();
-                                               send_raa!();
-                                       },
-                               }
-                               if let Some(msg) = shutdown {
-                                       channel_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown {
-                                               node_id: counterparty_node_id.clone(),
-                                               msg,
-                                       });
-                               }
-                               Ok(())
-                       },
-                       hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id))
-               }
+                                       (htlcs_failed_forward, handle_chan_restoration_locked!(self, channel_state_lock, channel_state, chan, revoke_and_ack, commitment_update, order, monitor_update_opt, Vec::new(), None, funding_locked))
+                               },
+                               hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id))
+                       }
+               };
+               post_handle_chan_restoration!(self, chan_restoration_res);
+               self.fail_holding_cell_htlcs(htlcs_failed_forward, msg.channel_id);
+               Ok(())
        }
 
        /// Begin Update fee process. Allowed only on an outbound channel.
@@ -3187,7 +3403,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
        /// (C-not exported) Cause its doc(hidden) anyway
        #[doc(hidden)]
        pub fn update_fee(&self, channel_id: [u8;32], feerate_per_kw: u32) -> Result<(), APIError> {
-               let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier);
+               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
                let counterparty_node_id;
                let err: Result<(), _> = loop {
                        let mut channel_state_lock = self.channel_state.lock().unwrap();
@@ -3235,52 +3451,115 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                }
        }
 
-       /// Process pending events from the `chain::Watch`.
-       fn process_pending_monitor_events(&self) {
+       /// Process pending events from the `chain::Watch`, returning whether any events were processed.
+       fn process_pending_monitor_events(&self) -> bool {
                let mut failed_channels = Vec::new();
+               let pending_monitor_events = self.chain_monitor.release_pending_monitor_events();
+               let has_pending_monitor_events = !pending_monitor_events.is_empty();
+               for monitor_event in pending_monitor_events {
+                       match monitor_event {
+                               MonitorEvent::HTLCEvent(htlc_update) => {
+                                       if let Some(preimage) = htlc_update.payment_preimage {
+                                               log_trace!(self.logger, "Claiming HTLC with preimage {} from our monitor", log_bytes!(preimage.0));
+                                               self.claim_funds_internal(self.channel_state.lock().unwrap(), htlc_update.source, preimage);
+                                       } else {
+                                               log_trace!(self.logger, "Failing HTLC with hash {} from our monitor", log_bytes!(htlc_update.payment_hash.0));
+                                               self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_update.source, &htlc_update.payment_hash, HTLCFailReason::Reason { failure_code: 0x4000 | 8, data: Vec::new() });
+                                       }
+                               },
+                               MonitorEvent::CommitmentTxBroadcasted(funding_outpoint) => {
+                                       let mut channel_lock = self.channel_state.lock().unwrap();
+                                       let channel_state = &mut *channel_lock;
+                                       let by_id = &mut channel_state.by_id;
+                                       let short_to_id = &mut channel_state.short_to_id;
+                                       let pending_msg_events = &mut channel_state.pending_msg_events;
+                                       if let Some(mut chan) = by_id.remove(&funding_outpoint.to_channel_id()) {
+                                               if let Some(short_id) = chan.get_short_channel_id() {
+                                                       short_to_id.remove(&short_id);
+                                               }
+                                               failed_channels.push(chan.force_shutdown(false));
+                                               if let Ok(update) = self.get_channel_update(&chan) {
+                                                       pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
+                                                               msg: update
+                                                       });
+                                               }
+                                               pending_msg_events.push(events::MessageSendEvent::HandleError {
+                                                       node_id: chan.get_counterparty_node_id(),
+                                                       action: msgs::ErrorAction::SendErrorMessage {
+                                                               msg: msgs::ErrorMessage { channel_id: chan.channel_id(), data: "Channel force-closed".to_owned() }
+                                                       },
+                                               });
+                                       }
+                               },
+                       }
+               }
+
+               for failure in failed_channels.drain(..) {
+                       self.finish_force_close_channel(failure);
+               }
+
+               has_pending_monitor_events
+       }
+
+       /// Check the holding cell in each channel and free any pending HTLCs in them if possible.
+       /// Returns whether there were any updates such as if pending HTLCs were freed or a monitor
+       /// update was applied.
+       ///
+       /// This should only apply to HTLCs which were added to the holding cell because we were
+       /// waiting on a monitor update to finish. In that case, we don't want to free the holding cell
+       /// directly in `channel_monitor_updated` as it may introduce deadlocks calling back into user
+       /// code to inform them of a channel monitor update.
+       fn check_free_holding_cells(&self) -> bool {
+               let mut has_monitor_update = false;
+               let mut failed_htlcs = Vec::new();
+               let mut handle_errors = Vec::new();
                {
-                       for monitor_event in self.chain_monitor.release_pending_monitor_events() {
-                               match monitor_event {
-                                       MonitorEvent::HTLCEvent(htlc_update) => {
-                                               if let Some(preimage) = htlc_update.payment_preimage {
-                                                       log_trace!(self.logger, "Claiming HTLC with preimage {} from our monitor", log_bytes!(preimage.0));
-                                                       self.claim_funds_internal(self.channel_state.lock().unwrap(), htlc_update.source, preimage);
-                                               } else {
-                                                       log_trace!(self.logger, "Failing HTLC with hash {} from our monitor", log_bytes!(htlc_update.payment_hash.0));
-                                                       self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_update.source, &htlc_update.payment_hash, HTLCFailReason::Reason { failure_code: 0x4000 | 8, data: Vec::new() });
+                       let mut channel_state_lock = self.channel_state.lock().unwrap();
+                       let channel_state = &mut *channel_state_lock;
+                       let by_id = &mut channel_state.by_id;
+                       let short_to_id = &mut channel_state.short_to_id;
+                       let pending_msg_events = &mut channel_state.pending_msg_events;
+
+                       by_id.retain(|channel_id, chan| {
+                               match chan.maybe_free_holding_cell_htlcs(&self.logger) {
+                                       Ok((commitment_opt, holding_cell_failed_htlcs)) => {
+                                               if !holding_cell_failed_htlcs.is_empty() {
+                                                       failed_htlcs.push((holding_cell_failed_htlcs, *channel_id));
                                                }
-                                       },
-                                       MonitorEvent::CommitmentTxBroadcasted(funding_outpoint) => {
-                                               let mut channel_lock = self.channel_state.lock().unwrap();
-                                               let channel_state = &mut *channel_lock;
-                                               let by_id = &mut channel_state.by_id;
-                                               let short_to_id = &mut channel_state.short_to_id;
-                                               let pending_msg_events = &mut channel_state.pending_msg_events;
-                                               if let Some(mut chan) = by_id.remove(&funding_outpoint.to_channel_id()) {
-                                                       if let Some(short_id) = chan.get_short_channel_id() {
-                                                               short_to_id.remove(&short_id);
-                                                       }
-                                                       failed_channels.push(chan.force_shutdown(false));
-                                                       if let Ok(update) = self.get_channel_update(&chan) {
-                                                               pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
-                                                                       msg: update
+                                               if let Some((commitment_update, monitor_update)) = commitment_opt {
+                                                       if let Err(e) = self.chain_monitor.update_channel(chan.get_funding_txo().unwrap(), monitor_update) {
+                                                               has_monitor_update = true;
+                                                               let (res, close_channel) = handle_monitor_err!(self, e, short_to_id, chan, RAACommitmentOrder::CommitmentFirst, false, true, Vec::new(), Vec::new(), channel_id);
+                                                               handle_errors.push((chan.get_counterparty_node_id(), res));
+                                                               if close_channel { return false; }
+                                                       } else {
+                                                               pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
+                                                                       node_id: chan.get_counterparty_node_id(),
+                                                                       updates: commitment_update,
                                                                });
                                                        }
-                                                       pending_msg_events.push(events::MessageSendEvent::HandleError {
-                                                               node_id: chan.get_counterparty_node_id(),
-                                                               action: msgs::ErrorAction::SendErrorMessage {
-                                                                       msg: msgs::ErrorMessage { channel_id: chan.channel_id(), data: "Channel force-closed".to_owned() }
-                                                               },
-                                                       });
                                                }
+                                               true
                                        },
+                                       Err(e) => {
+                                               let (close_channel, res) = convert_chan_err!(self, e, short_to_id, chan, channel_id);
+                                               handle_errors.push((chan.get_counterparty_node_id(), Err(res)));
+                                               !close_channel
+                                       }
                                }
-                       }
+                       });
                }
 
-               for failure in failed_channels.drain(..) {
-                       self.finish_force_close_channel(failure);
+               let has_update = has_monitor_update || !failed_htlcs.is_empty();
+               for (failures, channel_id) in failed_htlcs.drain(..) {
+                       self.fail_holding_cell_htlcs(failures, channel_id);
                }
+
+               for (counterparty_node_id, err) in handle_errors.drain(..) {
+                       let _ = handle_error!(self, err, counterparty_node_id);
+               }
+
+               has_update
        }
 
        /// Handle a list of channel failures during a block_connected or block_disconnected call,
@@ -3305,6 +3584,114 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                        self.finish_force_close_channel(failure);
                }
        }
+
+       fn set_payment_hash_secret_map(&self, payment_hash: PaymentHash, payment_preimage: Option<PaymentPreimage>, min_value_msat: Option<u64>, invoice_expiry_delta_secs: u32, user_payment_id: u64) -> Result<PaymentSecret, APIError> {
+               assert!(invoice_expiry_delta_secs <= 60*60*24*365); // Sadly bitcoin timestamps are u32s, so panic before 2106
+
+               let payment_secret = PaymentSecret(self.keys_manager.get_secure_random_bytes());
+
+               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
+               let mut payment_secrets = self.pending_inbound_payments.lock().unwrap();
+               match payment_secrets.entry(payment_hash) {
+                       hash_map::Entry::Vacant(e) => {
+                               e.insert(PendingInboundPayment {
+                                       payment_secret, min_value_msat, user_payment_id, payment_preimage,
+                                       // We assume that highest_seen_timestamp is pretty close to the current time -
+                                       // its updated when we receive a new block with the maximum time we've seen in
+                                       // a header. It should never be more than two hours in the future.
+                                       // Thus, we add two hours here as a buffer to ensure we absolutely
+                                       // never fail a payment too early.
+                                       // Note that we assume that received blocks have reasonably up-to-date
+                                       // timestamps.
+                                       expiry_time: self.highest_seen_timestamp.load(Ordering::Acquire) as u64 + invoice_expiry_delta_secs as u64 + 7200,
+                               });
+                       },
+                       hash_map::Entry::Occupied(_) => return Err(APIError::APIMisuseError { err: "Duplicate payment hash".to_owned() }),
+               }
+               Ok(payment_secret)
+       }
+
+       /// Gets a payment secret and payment hash for use in an invoice given to a third party wishing
+       /// to pay us.
+       ///
+       /// This differs from [`create_inbound_payment_for_hash`] only in that it generates the
+       /// [`PaymentHash`] and [`PaymentPreimage`] for you, returning the first and storing the second.
+       ///
+       /// The [`PaymentPreimage`] will ultimately be returned to you in the [`PaymentReceived`], which
+       /// will have the [`PaymentReceived::payment_preimage`] field filled in. That should then be
+       /// passed directly to [`claim_funds`].
+       ///
+       /// See [`create_inbound_payment_for_hash`] for detailed documentation on behavior and requirements.
+       ///
+       /// [`claim_funds`]: Self::claim_funds
+       /// [`PaymentReceived`]: events::Event::PaymentReceived
+       /// [`PaymentReceived::payment_preimage`]: events::Event::PaymentReceived::payment_preimage
+       /// [`create_inbound_payment_for_hash`]: Self::create_inbound_payment_for_hash
+       pub fn create_inbound_payment(&self, min_value_msat: Option<u64>, invoice_expiry_delta_secs: u32, user_payment_id: u64) -> (PaymentHash, PaymentSecret) {
+               let payment_preimage = PaymentPreimage(self.keys_manager.get_secure_random_bytes());
+               let payment_hash = PaymentHash(Sha256::hash(&payment_preimage.0).into_inner());
+
+               (payment_hash,
+                       self.set_payment_hash_secret_map(payment_hash, Some(payment_preimage), min_value_msat, invoice_expiry_delta_secs, user_payment_id)
+                               .expect("RNG Generated Duplicate PaymentHash"))
+       }
+
+       /// Gets a [`PaymentSecret`] for a given [`PaymentHash`], for which the payment preimage is
+       /// stored external to LDK.
+       ///
+       /// A [`PaymentReceived`] event will only be generated if the [`PaymentSecret`] matches a
+       /// payment secret fetched via this method or [`create_inbound_payment`], and which is at least
+       /// the `min_value_msat` provided here, if one is provided.
+       ///
+       /// The [`PaymentHash`] (and corresponding [`PaymentPreimage`]) must be globally unique. This
+       /// method may return an Err if another payment with the same payment_hash is still pending.
+       ///
+       /// `user_payment_id` will be provided back in [`PaymentReceived::user_payment_id`] events to
+       /// allow tracking of which events correspond with which calls to this and
+       /// [`create_inbound_payment`]. `user_payment_id` has no meaning inside of LDK, it is simply
+       /// copied to events and otherwise ignored. It may be used to correlate PaymentReceived events
+       /// with invoice metadata stored elsewhere.
+       ///
+       /// `min_value_msat` should be set if the invoice being generated contains a value. Any payment
+       /// received for the returned [`PaymentHash`] will be required to be at least `min_value_msat`
+       /// before a [`PaymentReceived`] event will be generated, ensuring that we do not provide the
+       /// sender "proof-of-payment" unless they have paid the required amount.
+       ///
+       /// `invoice_expiry_delta_secs` describes the number of seconds that the invoice is valid for
+       /// in excess of the current time. This should roughly match the expiry time set in the invoice.
+       /// After this many seconds, we will remove the inbound payment, resulting in any attempts to
+       /// pay the invoice failing. The BOLT spec suggests 3,600 secs as a default validity time for
+       /// invoices when no timeout is set.
+       ///
+       /// Note that we use block header time to time-out pending inbound payments (with some margin
+       /// to compensate for the inaccuracy of block header timestamps). Thus, in practice we will
+       /// accept a payment and generate a [`PaymentReceived`] event for some time after the expiry.
+       /// If you need exact expiry semantics, you should enforce them upon receipt of
+       /// [`PaymentReceived`].
+       ///
+       /// Pending inbound payments are stored in memory and in serialized versions of this
+       /// [`ChannelManager`]. If potentially unbounded numbers of inbound payments may exist and
+       /// space is limited, you may wish to rate-limit inbound payment creation.
+       ///
+       /// May panic if `invoice_expiry_delta_secs` is greater than one year.
+       ///
+       /// Note that invoices generated for inbound payments should have their `min_final_cltv_expiry`
+       /// set to at least [`MIN_FINAL_CLTV_EXPIRY`].
+       ///
+       /// [`create_inbound_payment`]: Self::create_inbound_payment
+       /// [`PaymentReceived`]: events::Event::PaymentReceived
+       /// [`PaymentReceived::user_payment_id`]: events::Event::PaymentReceived::user_payment_id
+       pub fn create_inbound_payment_for_hash(&self, payment_hash: PaymentHash, min_value_msat: Option<u64>, invoice_expiry_delta_secs: u32, user_payment_id: u64) -> Result<PaymentSecret, APIError> {
+               self.set_payment_hash_secret_map(payment_hash, None, min_value_msat, invoice_expiry_delta_secs, user_payment_id)
+       }
+
+       #[cfg(any(test, feature = "fuzztarget", feature = "_test_utils"))]
+       pub fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
+               let events = core::cell::RefCell::new(Vec::new());
+               let event_handler = |event| events.borrow_mut().push(event);
+               self.process_pending_events(&event_handler);
+               events.into_inner()
+       }
 }
 
 impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> MessageSendEventsProvider for ChannelManager<Signer, M, T, K, F, L>
@@ -3315,33 +3702,71 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> MessageSend
                                L::Target: Logger,
 {
        fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
-               //TODO: This behavior should be documented. It's non-intuitive that we query
-               // ChannelMonitors when clearing other events.
-               self.process_pending_monitor_events();
+               let events = RefCell::new(Vec::new());
+               PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || {
+                       let mut result = NotifyOption::SkipPersist;
+
+                       // TODO: This behavior should be documented. It's unintuitive that we query
+                       // ChannelMonitors when clearing other events.
+                       if self.process_pending_monitor_events() {
+                               result = NotifyOption::DoPersist;
+                       }
 
-               let mut ret = Vec::new();
-               let mut channel_state = self.channel_state.lock().unwrap();
-               mem::swap(&mut ret, &mut channel_state.pending_msg_events);
-               ret
+                       if self.check_free_holding_cells() {
+                               result = NotifyOption::DoPersist;
+                       }
+
+                       let mut pending_events = Vec::new();
+                       let mut channel_state = self.channel_state.lock().unwrap();
+                       mem::swap(&mut pending_events, &mut channel_state.pending_msg_events);
+
+                       if !pending_events.is_empty() {
+                               events.replace(pending_events);
+                       }
+
+                       result
+               });
+               events.into_inner()
        }
 }
 
 impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> EventsProvider for ChannelManager<Signer, M, T, K, F, L>
-       where M::Target: chain::Watch<Signer>,
-        T::Target: BroadcasterInterface,
-        K::Target: KeysInterface<Signer = Signer>,
-        F::Target: FeeEstimator,
-                               L::Target: Logger,
+where
+       M::Target: chain::Watch<Signer>,
+       T::Target: BroadcasterInterface,
+       K::Target: KeysInterface<Signer = Signer>,
+       F::Target: FeeEstimator,
+       L::Target: Logger,
 {
-       fn get_and_clear_pending_events(&self) -> Vec<Event> {
-               //TODO: This behavior should be documented. It's non-intuitive that we query
-               // ChannelMonitors when clearing other events.
-               self.process_pending_monitor_events();
+       /// Processes events that must be periodically handled.
+       ///
+       /// An [`EventHandler`] may safely call back to the provider in order to handle an event.
+       /// However, it must not call [`Writeable::write`] as doing so would result in a deadlock.
+       ///
+       /// Pending events are persisted as part of [`ChannelManager`]. While these events are cleared
+       /// when processed, an [`EventHandler`] must be able to handle previously seen events when
+       /// restarting from an old state.
+       fn process_pending_events<H: Deref>(&self, handler: H) where H::Target: EventHandler {
+               PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || {
+                       let mut result = NotifyOption::SkipPersist;
+
+                       // TODO: This behavior should be documented. It's unintuitive that we query
+                       // ChannelMonitors when clearing other events.
+                       if self.process_pending_monitor_events() {
+                               result = NotifyOption::DoPersist;
+                       }
 
-               let mut ret = Vec::new();
-               let mut pending_events = self.pending_events.lock().unwrap();
-               mem::swap(&mut ret, &mut *pending_events);
-               ret
+                       let mut pending_events = std::mem::replace(&mut *self.pending_events.lock().unwrap(), vec![]);
+                       if !pending_events.is_empty() {
+                               result = NotifyOption::DoPersist;
+                       }
+
+                       for event in pending_events.drain(..) {
+                               handler.handle_event(event);
+                       }
+
+                       result
+               });
        }
 }
 
@@ -3363,12 +3788,12 @@ where
                }
 
                let txdata: Vec<_> = block.txdata.iter().enumerate().collect();
-               self.transactions_confirmed(&block.header, height, &txdata);
-               self.update_best_block(&block.header, height);
+               self.transactions_confirmed(&block.header, &txdata, height);
+               self.best_block_updated(&block.header, height);
        }
 
        fn block_disconnected(&self, header: &BlockHeader, height: u32) {
-               let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier);
+               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
                let new_height = height - 1;
                {
                        let mut best_block = self.best_block.write().unwrap();
@@ -3379,16 +3804,98 @@ where
                        *best_block = BestBlock::new(header.prev_blockhash, new_height)
                }
 
-               self.do_chain_event(Some(new_height), |channel| channel.update_best_block(new_height, header.time));
+               self.do_chain_event(Some(new_height), |channel| channel.best_block_updated(new_height, header.time));
+       }
+}
+
+impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> chain::Confirm for ChannelManager<Signer, M, T, K, F, L>
+where
+       M::Target: chain::Watch<Signer>,
+       T::Target: BroadcasterInterface,
+       K::Target: KeysInterface<Signer = Signer>,
+       F::Target: FeeEstimator,
+       L::Target: Logger,
+{
+       fn transactions_confirmed(&self, header: &BlockHeader, txdata: &TransactionData, height: u32) {
+               // Note that we MUST NOT end up calling methods on self.chain_monitor here - we're called
+               // during initialization prior to the chain_monitor being fully configured in some cases.
+               // See the docs for `ChannelManagerReadArgs` for more.
+
+               let block_hash = header.block_hash();
+               log_trace!(self.logger, "{} transactions included in block {} at height {} provided", txdata.len(), block_hash, height);
+
+               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
+               self.do_chain_event(Some(height), |channel| channel.transactions_confirmed(&block_hash, height, txdata, &self.logger).map(|a| (a, Vec::new())));
+       }
+
+       fn best_block_updated(&self, header: &BlockHeader, height: u32) {
+               // Note that we MUST NOT end up calling methods on self.chain_monitor here - we're called
+               // during initialization prior to the chain_monitor being fully configured in some cases.
+               // See the docs for `ChannelManagerReadArgs` for more.
+
+               let block_hash = header.block_hash();
+               log_trace!(self.logger, "New best block: {} at height {}", block_hash, height);
+
+               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
+
+               *self.best_block.write().unwrap() = BestBlock::new(block_hash, height);
+
+               self.do_chain_event(Some(height), |channel| channel.best_block_updated(height, header.time));
+
+               macro_rules! max_time {
+                       ($timestamp: expr) => {
+                               loop {
+                                       // Update $timestamp to be the max of its current value and the block
+                                       // timestamp. This should keep us close to the current time without relying on
+                                       // having an explicit local time source.
+                                       // Just in case we end up in a race, we loop until we either successfully
+                                       // update $timestamp or decide we don't need to.
+                                       let old_serial = $timestamp.load(Ordering::Acquire);
+                                       if old_serial >= header.time as usize { break; }
+                                       if $timestamp.compare_exchange(old_serial, header.time as usize, Ordering::AcqRel, Ordering::Relaxed).is_ok() {
+                                               break;
+                                       }
+                               }
+                       }
+               }
+               max_time!(self.last_node_announcement_serial);
+               max_time!(self.highest_seen_timestamp);
+               let mut payment_secrets = self.pending_inbound_payments.lock().unwrap();
+               payment_secrets.retain(|_, inbound_payment| {
+                       inbound_payment.expiry_time > header.time as u64
+               });
+       }
+
+       fn get_relevant_txids(&self) -> Vec<Txid> {
+               let channel_state = self.channel_state.lock().unwrap();
+               let mut res = Vec::with_capacity(channel_state.short_to_id.len());
+               for chan in channel_state.by_id.values() {
+                       if let Some(funding_txo) = chan.get_funding_txo() {
+                               res.push(funding_txo.txid);
+                       }
+               }
+               res
+       }
+
+       fn transaction_unconfirmed(&self, txid: &Txid) {
+               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
+               self.do_chain_event(None, |channel| {
+                       if let Some(funding_txo) = channel.get_funding_txo() {
+                               if funding_txo.txid == *txid {
+                                       channel.funding_transaction_unconfirmed().map(|_| (None, Vec::new()))
+                               } else { Ok((None, Vec::new())) }
+                       } else { Ok((None, Vec::new())) }
+               });
        }
 }
 
 impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<Signer, M, T, K, F, L>
-       where M::Target: chain::Watch<Signer>,
-        T::Target: BroadcasterInterface,
-        K::Target: KeysInterface<Signer = Signer>,
-        F::Target: FeeEstimator,
-        L::Target: Logger,
+where
+       M::Target: chain::Watch<Signer>,
+       T::Target: BroadcasterInterface,
+       K::Target: KeysInterface<Signer = Signer>,
+       F::Target: FeeEstimator,
+       L::Target: Logger,
 {
        /// Calls a function which handles an on-chain event (blocks dis/connected, transactions
        /// un/confirmed, etc) on each channel, handling any resulting errors or messages generated by
@@ -3454,7 +3961,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                        });
 
                        if let Some(height) = height_opt {
-                               channel_state.claimable_htlcs.retain(|&(ref payment_hash, _), htlcs| {
+                               channel_state.claimable_htlcs.retain(|payment_hash, htlcs| {
                                        htlcs.retain(|htlc| {
                                                // If height is approaching the number of blocks we think it takes us to get
                                                // our commitment transaction confirmed before the HTLC expires, plus the
@@ -3482,131 +3989,6 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                }
        }
 
-       /// Updates channel state to take note of transactions which were confirmed in the given block
-       /// at the given height.
-       ///
-       /// Note that you must still call (or have called) [`update_best_block`] with the block
-       /// information which is included here.
-       ///
-       /// This method may be called before or after [`update_best_block`] for a given block's
-       /// transaction data and may be called multiple times with additional transaction data for a
-       /// given block.
-       ///
-       /// This method may be called for a previous block after an [`update_best_block`] call has
-       /// been made for a later block, however it must *not* be called with transaction data from a
-       /// block which is no longer in the best chain (ie where [`update_best_block`] has already
-       /// been informed about a blockchain reorganization which no longer includes the block which
-       /// corresponds to `header`).
-       ///
-       /// [`update_best_block`]: `Self::update_best_block`
-       pub fn transactions_confirmed(&self, header: &BlockHeader, height: u32, txdata: &TransactionData) {
-               // Note that we MUST NOT end up calling methods on self.chain_monitor here - we're called
-               // during initialization prior to the chain_monitor being fully configured in some cases.
-               // See the docs for `ChannelManagerReadArgs` for more.
-
-               let block_hash = header.block_hash();
-               log_trace!(self.logger, "{} transactions included in block {} at height {} provided", txdata.len(), block_hash, height);
-
-               let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier);
-               self.do_chain_event(Some(height), |channel| channel.transactions_confirmed(&block_hash, height, txdata, &self.logger).map(|a| (a, Vec::new())));
-       }
-
-       /// Updates channel state with the current best blockchain tip. You should attempt to call this
-       /// quickly after a new block becomes available, however if multiple new blocks become
-       /// available at the same time, only a single `update_best_block()` call needs to be made.
-       ///
-       /// This method should also be called immediately after any block disconnections, once at the
-       /// reorganization fork point, and once with the new chain tip. Calling this method at the
-       /// blockchain reorganization fork point ensures we learn when a funding transaction which was
-       /// previously confirmed is reorganized out of the blockchain, ensuring we do not continue to
-       /// accept payments which cannot be enforced on-chain.
-       ///
-       /// In both the block-connection and block-disconnection case, this method may be called either
-       /// once per block connected or disconnected, or simply at the fork point and new tip(s),
-       /// skipping any intermediary blocks.
-       pub fn update_best_block(&self, header: &BlockHeader, height: u32) {
-               // Note that we MUST NOT end up calling methods on self.chain_monitor here - we're called
-               // during initialization prior to the chain_monitor being fully configured in some cases.
-               // See the docs for `ChannelManagerReadArgs` for more.
-
-               let block_hash = header.block_hash();
-               log_trace!(self.logger, "New best block: {} at height {}", block_hash, height);
-
-               let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier);
-
-               *self.best_block.write().unwrap() = BestBlock::new(block_hash, height);
-
-               self.do_chain_event(Some(height), |channel| channel.update_best_block(height, header.time));
-
-               loop {
-                       // Update last_node_announcement_serial to be the max of its current value and the
-                       // block timestamp. This should keep us close to the current time without relying on
-                       // having an explicit local time source.
-                       // Just in case we end up in a race, we loop until we either successfully update
-                       // last_node_announcement_serial or decide we don't need to.
-                       let old_serial = self.last_node_announcement_serial.load(Ordering::Acquire);
-                       if old_serial >= header.time as usize { break; }
-                       if self.last_node_announcement_serial.compare_exchange(old_serial, header.time as usize, Ordering::AcqRel, Ordering::Relaxed).is_ok() {
-                               break;
-                       }
-               }
-       }
-
-       /// Gets the set of txids which should be monitored for their confirmation state.
-       ///
-       /// If you're providing information about reorganizations via [`transaction_unconfirmed`], this
-       /// is the set of transactions which you may need to call [`transaction_unconfirmed`] for.
-       ///
-       /// This may be useful to poll to determine the set of transactions which must be registered
-       /// with an Electrum server or for which an Electrum server needs to be polled to determine
-       /// transaction confirmation state.
-       ///
-       /// This may update after any [`transactions_confirmed`] or [`block_connected`] call.
-       ///
-       /// Note that this is NOT the set of transactions which must be included in calls to
-       /// [`transactions_confirmed`] if they are confirmed, but a small subset of it.
-       ///
-       /// [`transactions_confirmed`]: Self::transactions_confirmed
-       /// [`transaction_unconfirmed`]: Self::transaction_unconfirmed
-       /// [`block_connected`]: chain::Listen::block_connected
-       pub fn get_relevant_txids(&self) -> Vec<Txid> {
-               let channel_state = self.channel_state.lock().unwrap();
-               let mut res = Vec::with_capacity(channel_state.short_to_id.len());
-               for chan in channel_state.by_id.values() {
-                       if let Some(funding_txo) = chan.get_funding_txo() {
-                               res.push(funding_txo.txid);
-                       }
-               }
-               res
-       }
-
-       /// Marks a transaction as having been reorganized out of the blockchain.
-       ///
-       /// If a transaction is included in [`get_relevant_txids`], and is no longer in the main branch
-       /// of the blockchain, this function should be called to indicate that the transaction should
-       /// be considered reorganized out.
-       ///
-       /// Once this is called, the given transaction will no longer appear on [`get_relevant_txids`],
-       /// though this may be called repeatedly for a given transaction without issue.
-       ///
-       /// Note that if the transaction is confirmed on the main chain in a different block (indicated
-       /// via a call to [`transactions_confirmed`]), it may re-appear in [`get_relevant_txids`], thus
-       /// be very wary of race-conditions wherein the final state of a transaction indicated via
-       /// these APIs is not the same as its state on the blockchain.
-       ///
-       /// [`transactions_confirmed`]: Self::transactions_confirmed
-       /// [`get_relevant_txids`]: Self::get_relevant_txids
-       pub fn transaction_unconfirmed(&self, txid: &Txid) {
-               let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier);
-               self.do_chain_event(None, |channel| {
-                       if let Some(funding_txo) = channel.get_funding_txo() {
-                               if funding_txo.txid == *txid {
-                                       channel.funding_transaction_unconfirmed().map(|_| (None, Vec::new()))
-                               } else { Ok((None, Vec::new())) }
-                       } else { Ok((None, Vec::new())) }
-               });
-       }
-
        /// Blocks until ChannelManager needs to be persisted or a timeout is reached. It returns a bool
        /// indicating whether persistence is necessary. Only one listener on
        /// `await_persistable_update` or `await_persistable_update_timeout` is guaranteed to be woken
@@ -3633,7 +4015,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
        }
 }
 
-impl<Signer: Sign, M: Deref + Sync + Send, T: Deref + Sync + Send, K: Deref + Sync + Send, F: Deref + Sync + Send, L: Deref + Sync + Send>
+impl<Signer: Sign, M: Deref , T: Deref , K: Deref , F: Deref , L: Deref >
        ChannelMessageHandler for ChannelManager<Signer, M, T, K, F, L>
        where M::Target: chain::Watch<Signer>,
         T::Target: BroadcasterInterface,
@@ -3642,94 +4024,93 @@ impl<Signer: Sign, M: Deref + Sync + Send, T: Deref + Sync + Send, K: Deref + Sy
         L::Target: Logger,
 {
        fn handle_open_channel(&self, counterparty_node_id: &PublicKey, their_features: InitFeatures, msg: &msgs::OpenChannel) {
-               let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier);
+               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
                let _ = handle_error!(self, self.internal_open_channel(counterparty_node_id, their_features, msg), *counterparty_node_id);
        }
 
        fn handle_accept_channel(&self, counterparty_node_id: &PublicKey, their_features: InitFeatures, msg: &msgs::AcceptChannel) {
-               let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier);
+               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
                let _ = handle_error!(self, self.internal_accept_channel(counterparty_node_id, their_features, msg), *counterparty_node_id);
        }
 
        fn handle_funding_created(&self, counterparty_node_id: &PublicKey, msg: &msgs::FundingCreated) {
-               let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier);
+               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
                let _ = handle_error!(self, self.internal_funding_created(counterparty_node_id, msg), *counterparty_node_id);
        }
 
        fn handle_funding_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::FundingSigned) {
-               let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier);
+               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
                let _ = handle_error!(self, self.internal_funding_signed(counterparty_node_id, msg), *counterparty_node_id);
        }
 
        fn handle_funding_locked(&self, counterparty_node_id: &PublicKey, msg: &msgs::FundingLocked) {
-               let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier);
+               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
                let _ = handle_error!(self, self.internal_funding_locked(counterparty_node_id, msg), *counterparty_node_id);
        }
 
        fn handle_shutdown(&self, counterparty_node_id: &PublicKey, their_features: &InitFeatures, msg: &msgs::Shutdown) {
-               let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier);
+               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
                let _ = handle_error!(self, self.internal_shutdown(counterparty_node_id, their_features, msg), *counterparty_node_id);
        }
 
        fn handle_closing_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::ClosingSigned) {
-               let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier);
+               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
                let _ = handle_error!(self, self.internal_closing_signed(counterparty_node_id, msg), *counterparty_node_id);
        }
 
        fn handle_update_add_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateAddHTLC) {
-               let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier);
+               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
                let _ = handle_error!(self, self.internal_update_add_htlc(counterparty_node_id, msg), *counterparty_node_id);
        }
 
        fn handle_update_fulfill_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFulfillHTLC) {
-               let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier);
+               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
                let _ = handle_error!(self, self.internal_update_fulfill_htlc(counterparty_node_id, msg), *counterparty_node_id);
        }
 
        fn handle_update_fail_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFailHTLC) {
-               let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier);
+               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
                let _ = handle_error!(self, self.internal_update_fail_htlc(counterparty_node_id, msg), *counterparty_node_id);
        }
 
        fn handle_update_fail_malformed_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFailMalformedHTLC) {
-               let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier);
+               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
                let _ = handle_error!(self, self.internal_update_fail_malformed_htlc(counterparty_node_id, msg), *counterparty_node_id);
        }
 
        fn handle_commitment_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::CommitmentSigned) {
-               let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier);
+               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
                let _ = handle_error!(self, self.internal_commitment_signed(counterparty_node_id, msg), *counterparty_node_id);
        }
 
        fn handle_revoke_and_ack(&self, counterparty_node_id: &PublicKey, msg: &msgs::RevokeAndACK) {
-               let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier);
+               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
                let _ = handle_error!(self, self.internal_revoke_and_ack(counterparty_node_id, msg), *counterparty_node_id);
        }
 
        fn handle_update_fee(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFee) {
-               let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier);
+               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
                let _ = handle_error!(self, self.internal_update_fee(counterparty_node_id, msg), *counterparty_node_id);
        }
 
        fn handle_announcement_signatures(&self, counterparty_node_id: &PublicKey, msg: &msgs::AnnouncementSignatures) {
-               let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier);
+               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
                let _ = handle_error!(self, self.internal_announcement_signatures(counterparty_node_id, msg), *counterparty_node_id);
        }
 
        fn handle_channel_update(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelUpdate) {
-               let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier);
+               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
                let _ = handle_error!(self, self.internal_channel_update(counterparty_node_id, msg), *counterparty_node_id);
        }
 
        fn handle_channel_reestablish(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReestablish) {
-               let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier);
+               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
                let _ = handle_error!(self, self.internal_channel_reestablish(counterparty_node_id, msg), *counterparty_node_id);
        }
 
        fn peer_disconnected(&self, counterparty_node_id: &PublicKey, no_connection_possible: bool) {
-               let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier);
+               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
                let mut failed_channels = Vec::new();
-               let mut failed_payments = Vec::new();
                let mut no_channels_remain = true;
                {
                        let mut channel_state_lock = self.channel_state.lock().unwrap();
@@ -3758,16 +4139,7 @@ impl<Signer: Sign, M: Deref + Sync + Send, T: Deref + Sync + Send, K: Deref + Sy
                                log_debug!(self.logger, "Marking channels with {} disconnected and generating channel_updates", log_pubkey!(counterparty_node_id));
                                channel_state.by_id.retain(|_, chan| {
                                        if chan.get_counterparty_node_id() == *counterparty_node_id {
-                                               // Note that currently on channel reestablish we assert that there are no
-                                               // holding cell add-HTLCs, so if in the future we stop removing uncommitted HTLCs
-                                               // on peer disconnect here, there will need to be corresponding changes in
-                                               // reestablish logic.
-                                               let failed_adds = chan.remove_uncommitted_htlcs_and_mark_paused(&self.logger);
-                                               chan.to_disabled_marked();
-                                               if !failed_adds.is_empty() {
-                                                       let chan_update = self.get_channel_update(&chan).map(|u| u.encode_with_len()).unwrap(); // Cannot add/recv HTLCs before we have a short_id so unwrap is safe
-                                                       failed_payments.push((chan_update, failed_adds));
-                                               }
+                                               chan.remove_uncommitted_htlcs_and_mark_paused(&self.logger);
                                                if chan.is_shutdown() {
                                                        if let Some(short_id) = chan.get_short_channel_id() {
                                                                short_to_id.remove(&short_id);
@@ -3811,17 +4183,12 @@ impl<Signer: Sign, M: Deref + Sync + Send, T: Deref + Sync + Send, K: Deref + Sy
                for failure in failed_channels.drain(..) {
                        self.finish_force_close_channel(failure);
                }
-               for (chan_update, mut htlc_sources) in failed_payments {
-                       for (htlc_source, payment_hash) in htlc_sources.drain(..) {
-                               self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_source, &payment_hash, HTLCFailReason::Reason { failure_code: 0x1000 | 7, data: chan_update.clone() });
-                       }
-               }
        }
 
        fn peer_connected(&self, counterparty_node_id: &PublicKey, init_msg: &msgs::Init) {
                log_debug!(self.logger, "Generating channel_reestablish events for {}", log_pubkey!(counterparty_node_id));
 
-               let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier);
+               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
 
                {
                        let mut peer_state_lock = self.per_peer_state.write().unwrap();
@@ -3861,7 +4228,7 @@ impl<Signer: Sign, M: Deref + Sync + Send, T: Deref + Sync + Send, K: Deref + Sy
        }
 
        fn handle_error(&self, counterparty_node_id: &PublicKey, msg: &msgs::ErrorMessage) {
-               let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier);
+               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
 
                if msg.channel_id == [0; 32] {
                        for chan in self.list_channels() {
@@ -3896,6 +4263,10 @@ impl PersistenceNotifier {
                loop {
                        let &(ref mtx, ref cvar) = &self.persistence_lock;
                        let mut guard = mtx.lock().unwrap();
+                       if *guard {
+                               *guard = false;
+                               return;
+                       }
                        guard = cvar.wait(guard).unwrap();
                        let result = *guard;
                        if result {
@@ -3911,6 +4282,10 @@ impl PersistenceNotifier {
                loop {
                        let &(ref mtx, ref cvar) = &self.persistence_lock;
                        let mut guard = mtx.lock().unwrap();
+                       if *guard {
+                               *guard = false;
+                               return true;
+                       }
                        guard = cvar.wait_timeout(guard, max_wait).unwrap().0;
                        // Due to spurious wakeups that can happen on `wait_timeout`, here we need to check if the
                        // desired wait time has actually passed, and if not then restart the loop with a reduced wait
@@ -3943,215 +4318,89 @@ impl PersistenceNotifier {
 const SERIALIZATION_VERSION: u8 = 1;
 const MIN_SERIALIZATION_VERSION: u8 = 1;
 
-impl Writeable for PendingHTLCInfo {
-       fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ::std::io::Error> {
-               match &self.routing {
-                       &PendingHTLCRouting::Forward { ref onion_packet, ref short_channel_id } => {
-                               0u8.write(writer)?;
-                               onion_packet.write(writer)?;
-                               short_channel_id.write(writer)?;
-                       },
-                       &PendingHTLCRouting::Receive { ref payment_data, ref incoming_cltv_expiry } => {
-                               1u8.write(writer)?;
-                               payment_data.write(writer)?;
-                               incoming_cltv_expiry.write(writer)?;
-                       },
-               }
-               self.incoming_shared_secret.write(writer)?;
-               self.payment_hash.write(writer)?;
-               self.amt_to_forward.write(writer)?;
-               self.outgoing_cltv_value.write(writer)?;
-               Ok(())
-       }
-}
-
-impl Readable for PendingHTLCInfo {
-       fn read<R: ::std::io::Read>(reader: &mut R) -> Result<PendingHTLCInfo, DecodeError> {
-               Ok(PendingHTLCInfo {
-                       routing: match Readable::read(reader)? {
-                               0u8 => PendingHTLCRouting::Forward {
-                                       onion_packet: Readable::read(reader)?,
-                                       short_channel_id: Readable::read(reader)?,
-                               },
-                               1u8 => PendingHTLCRouting::Receive {
-                                       payment_data: Readable::read(reader)?,
-                                       incoming_cltv_expiry: Readable::read(reader)?,
-                               },
-                               _ => return Err(DecodeError::InvalidValue),
-                       },
-                       incoming_shared_secret: Readable::read(reader)?,
-                       payment_hash: Readable::read(reader)?,
-                       amt_to_forward: Readable::read(reader)?,
-                       outgoing_cltv_value: Readable::read(reader)?,
-               })
-       }
-}
-
-impl Writeable for HTLCFailureMsg {
-       fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ::std::io::Error> {
-               match self {
-                       &HTLCFailureMsg::Relay(ref fail_msg) => {
-                               0u8.write(writer)?;
-                               fail_msg.write(writer)?;
-                       },
-                       &HTLCFailureMsg::Malformed(ref fail_msg) => {
-                               1u8.write(writer)?;
-                               fail_msg.write(writer)?;
-                       }
-               }
-               Ok(())
-       }
-}
-
-impl Readable for HTLCFailureMsg {
-       fn read<R: ::std::io::Read>(reader: &mut R) -> Result<HTLCFailureMsg, DecodeError> {
-               match <u8 as Readable>::read(reader)? {
-                       0 => Ok(HTLCFailureMsg::Relay(Readable::read(reader)?)),
-                       1 => Ok(HTLCFailureMsg::Malformed(Readable::read(reader)?)),
-                       _ => Err(DecodeError::InvalidValue),
-               }
-       }
-}
-
-impl Writeable for PendingHTLCStatus {
-       fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ::std::io::Error> {
-               match self {
-                       &PendingHTLCStatus::Forward(ref forward_info) => {
-                               0u8.write(writer)?;
-                               forward_info.write(writer)?;
-                       },
-                       &PendingHTLCStatus::Fail(ref fail_msg) => {
-                               1u8.write(writer)?;
-                               fail_msg.write(writer)?;
-                       }
-               }
-               Ok(())
-       }
-}
-
-impl Readable for PendingHTLCStatus {
-       fn read<R: ::std::io::Read>(reader: &mut R) -> Result<PendingHTLCStatus, DecodeError> {
-               match <u8 as Readable>::read(reader)? {
-                       0 => Ok(PendingHTLCStatus::Forward(Readable::read(reader)?)),
-                       1 => Ok(PendingHTLCStatus::Fail(Readable::read(reader)?)),
-                       _ => Err(DecodeError::InvalidValue),
-               }
-       }
-}
-
-impl_writeable!(HTLCPreviousHopData, 0, {
-       short_channel_id,
-       outpoint,
-       htlc_id,
-       incoming_packet_shared_secret
-});
-
-impl_writeable!(ClaimableHTLC, 0, {
-       prev_hop,
-       value,
-       payment_data,
-       cltv_expiry
-});
-
-impl Writeable for HTLCSource {
-       fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ::std::io::Error> {
-               match self {
-                       &HTLCSource::PreviousHopData(ref hop_data) => {
-                               0u8.write(writer)?;
-                               hop_data.write(writer)?;
-                       },
-                       &HTLCSource::OutboundRoute { ref path, ref session_priv, ref first_hop_htlc_msat } => {
-                               1u8.write(writer)?;
-                               path.write(writer)?;
-                               session_priv.write(writer)?;
-                               first_hop_htlc_msat.write(writer)?;
-                       }
-               }
-               Ok(())
-       }
-}
-
-impl Readable for HTLCSource {
-       fn read<R: ::std::io::Read>(reader: &mut R) -> Result<HTLCSource, DecodeError> {
-               match <u8 as Readable>::read(reader)? {
-                       0 => Ok(HTLCSource::PreviousHopData(Readable::read(reader)?)),
-                       1 => Ok(HTLCSource::OutboundRoute {
-                               path: Readable::read(reader)?,
-                               session_priv: Readable::read(reader)?,
-                               first_hop_htlc_msat: Readable::read(reader)?,
-                       }),
-                       _ => Err(DecodeError::InvalidValue),
-               }
-       }
-}
-
-impl Writeable for HTLCFailReason {
-       fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ::std::io::Error> {
-               match self {
-                       &HTLCFailReason::LightningError { ref err } => {
-                               0u8.write(writer)?;
-                               err.write(writer)?;
-                       },
-                       &HTLCFailReason::Reason { ref failure_code, ref data } => {
-                               1u8.write(writer)?;
-                               failure_code.write(writer)?;
-                               data.write(writer)?;
-                       }
-               }
-               Ok(())
-       }
-}
-
-impl Readable for HTLCFailReason {
-       fn read<R: ::std::io::Read>(reader: &mut R) -> Result<HTLCFailReason, DecodeError> {
-               match <u8 as Readable>::read(reader)? {
-                       0 => Ok(HTLCFailReason::LightningError { err: Readable::read(reader)? }),
-                       1 => Ok(HTLCFailReason::Reason {
-                               failure_code: Readable::read(reader)?,
-                               data: Readable::read(reader)?,
-                       }),
-                       _ => Err(DecodeError::InvalidValue),
-               }
-       }
-}
-
-impl Writeable for HTLCForwardInfo {
-       fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ::std::io::Error> {
-               match self {
-                       &HTLCForwardInfo::AddHTLC { ref prev_short_channel_id, ref prev_funding_outpoint, ref prev_htlc_id, ref forward_info } => {
-                               0u8.write(writer)?;
-                               prev_short_channel_id.write(writer)?;
-                               prev_funding_outpoint.write(writer)?;
-                               prev_htlc_id.write(writer)?;
-                               forward_info.write(writer)?;
-                       },
-                       &HTLCForwardInfo::FailHTLC { ref htlc_id, ref err_packet } => {
-                               1u8.write(writer)?;
-                               htlc_id.write(writer)?;
-                               err_packet.write(writer)?;
-                       },
-               }
-               Ok(())
-       }
-}
-
-impl Readable for HTLCForwardInfo {
-       fn read<R: ::std::io::Read>(reader: &mut R) -> Result<HTLCForwardInfo, DecodeError> {
-               match <u8 as Readable>::read(reader)? {
-                       0 => Ok(HTLCForwardInfo::AddHTLC {
-                               prev_short_channel_id: Readable::read(reader)?,
-                               prev_funding_outpoint: Readable::read(reader)?,
-                               prev_htlc_id: Readable::read(reader)?,
-                               forward_info: Readable::read(reader)?,
-                       }),
-                       1 => Ok(HTLCForwardInfo::FailHTLC {
-                               htlc_id: Readable::read(reader)?,
-                               err_packet: Readable::read(reader)?,
-                       }),
-                       _ => Err(DecodeError::InvalidValue),
-               }
-       }
-}
+impl_writeable_tlv_based_enum!(PendingHTLCRouting,
+       (0, Forward) => {
+               (0, onion_packet),
+               (2, short_channel_id),
+       }, {}, {},
+       (1, Receive) => {
+               (0, payment_data),
+               (2, incoming_cltv_expiry),
+       }, {}, {}
+;);
+
+impl_writeable_tlv_based!(PendingHTLCInfo, {
+       (0, routing),
+       (2, incoming_shared_secret),
+       (4, payment_hash),
+       (6, amt_to_forward),
+       (8, outgoing_cltv_value)
+}, {}, {});
+
+impl_writeable_tlv_based_enum!(HTLCFailureMsg, ;
+       (0, Relay),
+       (1, Malformed),
+);
+impl_writeable_tlv_based_enum!(PendingHTLCStatus, ;
+       (0, Forward),
+       (1, Fail),
+);
+
+impl_writeable_tlv_based!(HTLCPreviousHopData, {
+       (0, short_channel_id),
+       (2, outpoint),
+       (4, htlc_id),
+       (6, incoming_packet_shared_secret)
+}, {}, {});
+
+impl_writeable_tlv_based!(ClaimableHTLC, {
+       (0, prev_hop),
+       (2, value),
+       (4, payment_data),
+       (6, cltv_expiry),
+}, {}, {});
+
+impl_writeable_tlv_based_enum!(HTLCSource,
+       (0, OutboundRoute) => {
+               (0, session_priv),
+               (2, first_hop_htlc_msat),
+       }, {}, {
+               (4, path),
+       };
+       (1, PreviousHopData)
+);
+
+impl_writeable_tlv_based_enum!(HTLCFailReason,
+       (0, LightningError) => {
+               (0, err),
+       }, {}, {},
+       (1, Reason) => {
+               (0, failure_code),
+       }, {}, {
+               (2, data),
+       },
+;);
+
+impl_writeable_tlv_based_enum!(HTLCForwardInfo,
+       (0, AddHTLC) => {
+               (0, forward_info),
+               (2, prev_short_channel_id),
+               (4, prev_htlc_id),
+               (6, prev_funding_outpoint),
+       }, {}, {},
+       (1, FailHTLC) => {
+               (0, htlc_id),
+               (2, err_packet),
+       }, {}, {},
+;);
+
+impl_writeable_tlv_based!(PendingInboundPayment, {
+       (0, payment_secret),
+       (2, expiry_time),
+       (4, user_payment_id),
+       (6, payment_preimage),
+       (8, min_value_msat),
+}, {}, {});
 
 impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> Writeable for ChannelManager<Signer, M, T, K, F, L>
        where M::Target: chain::Watch<Signer>,
@@ -4163,8 +4412,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> Writeable f
        fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ::std::io::Error> {
                let _consistency_lock = self.total_consistency_lock.write().unwrap();
 
-               writer.write_all(&[SERIALIZATION_VERSION; 1])?;
-               writer.write_all(&[MIN_SERIALIZATION_VERSION; 1])?;
+               write_ver_prefix!(writer, SERIALIZATION_VERSION, MIN_SERIALIZATION_VERSION);
 
                self.genesis_hash.write(writer)?;
                {
@@ -4232,6 +4480,22 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> Writeable f
                }
 
                (self.last_node_announcement_serial.load(Ordering::Acquire) as u32).write(writer)?;
+               (self.highest_seen_timestamp.load(Ordering::Acquire) as u32).write(writer)?;
+
+               let pending_inbound_payments = self.pending_inbound_payments.lock().unwrap();
+               (pending_inbound_payments.len() as u64).write(writer)?;
+               for (hash, pending_payment) in pending_inbound_payments.iter() {
+                       hash.write(writer)?;
+                       pending_payment.write(writer)?;
+               }
+
+               let pending_outbound_payments = self.pending_outbound_payments.lock().unwrap();
+               (pending_outbound_payments.len() as u64).write(writer)?;
+               for session_priv in pending_outbound_payments.iter() {
+                       session_priv.write(writer)?;
+               }
+
+               write_tlv_fields!(writer, {}, {});
 
                Ok(())
        }
@@ -4356,11 +4620,7 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
         L::Target: Logger,
 {
        fn read<R: ::std::io::Read>(reader: &mut R, mut args: ChannelManagerReadArgs<'a, Signer, M, T, K, F, L>) -> Result<Self, DecodeError> {
-               let _ver: u8 = Readable::read(reader)?;
-               let min_ver: u8 = Readable::read(reader)?;
-               if min_ver > SERIALIZATION_VERSION {
-                       return Err(DecodeError::UnknownVersion);
-               }
+               let _ver = read_ver_prefix!(reader, SERIALIZATION_VERSION);
 
                let genesis_hash: BlockHash = Readable::read(reader)?;
                let best_block_height: u32 = Readable::read(reader)?;
@@ -4462,6 +4722,25 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
                }
 
                let last_node_announcement_serial: u32 = Readable::read(reader)?;
+               let highest_seen_timestamp: u32 = Readable::read(reader)?;
+
+               let pending_inbound_payment_count: u64 = Readable::read(reader)?;
+               let mut pending_inbound_payments: HashMap<PaymentHash, PendingInboundPayment> = HashMap::with_capacity(cmp::min(pending_inbound_payment_count as usize, MAX_ALLOC_SIZE/(3*32)));
+               for _ in 0..pending_inbound_payment_count {
+                       if pending_inbound_payments.insert(Readable::read(reader)?, Readable::read(reader)?).is_some() {
+                               return Err(DecodeError::InvalidValue);
+                       }
+               }
+
+               let pending_outbound_payments_count: u64 = Readable::read(reader)?;
+               let mut pending_outbound_payments: HashSet<[u8; 32]> = HashSet::with_capacity(cmp::min(pending_outbound_payments_count as usize, MAX_ALLOC_SIZE/32));
+               for _ in 0..pending_outbound_payments_count {
+                       if !pending_outbound_payments.insert(Readable::read(reader)?) {
+                               return Err(DecodeError::InvalidValue);
+                       }
+               }
+
+               read_tlv_fields!(reader, {}, {});
 
                let mut secp_ctx = Secp256k1::new();
                secp_ctx.seeded_randomize(&args.keys_manager.get_secure_random_bytes());
@@ -4481,11 +4760,15 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
                                claimable_htlcs,
                                pending_msg_events: Vec::new(),
                        }),
+                       pending_inbound_payments: Mutex::new(pending_inbound_payments),
+                       pending_outbound_payments: Mutex::new(pending_outbound_payments),
+
                        our_network_key: args.keys_manager.get_node_secret(),
                        our_network_pubkey: PublicKey::from_secret_key(&secp_ctx, &args.keys_manager.get_node_secret()),
                        secp_ctx,
 
                        last_node_announcement_serial: AtomicUsize::new(last_node_announcement_serial as usize),
+                       highest_seen_timestamp: AtomicUsize::new(highest_seen_timestamp as usize),
 
                        per_peer_state: RwLock::new(per_peer_state),
 
@@ -4514,9 +4797,9 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
 mod tests {
        use ln::channelmanager::PersistenceNotifier;
        use std::sync::Arc;
-       use std::sync::atomic::{AtomicBool, Ordering};
+       use core::sync::atomic::{AtomicBool, Ordering};
        use std::thread;
-       use std::time::Duration;
+       use core::time::Duration;
 
        #[test]
        fn test_wait_timeout() {
@@ -4568,20 +4851,20 @@ pub mod bench {
        use chain::channelmonitor::Persist;
        use chain::keysinterface::{KeysManager, InMemorySigner};
        use ln::channelmanager::{BestBlock, ChainParameters, ChannelManager, PaymentHash, PaymentPreimage};
-       use ln::features::InitFeatures;
+       use ln::features::{InitFeatures, InvoiceFeatures};
        use ln::functional_test_utils::*;
        use ln::msgs::ChannelMessageHandler;
        use routing::network_graph::NetworkGraph;
        use routing::router::get_route;
        use util::test_utils;
        use util::config::UserConfig;
-       use util::events::{Event, EventsProvider, MessageSendEvent, MessageSendEventsProvider};
+       use util::events::{Event, MessageSendEvent, MessageSendEventsProvider};
 
        use bitcoin::hashes::Hash;
        use bitcoin::hashes::sha256::Hash as Sha256;
        use bitcoin::{Block, BlockHeader, Transaction, TxOut};
 
-       use std::sync::Mutex;
+       use std::sync::{Arc, Mutex};
 
        use test::Bencher;
 
@@ -4607,7 +4890,7 @@ pub mod bench {
                let network = bitcoin::Network::Testnet;
                let genesis_hash = bitcoin::blockdata::constants::genesis_block(network).header.block_hash();
 
-               let tx_broadcaster = test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new())};
+               let tx_broadcaster = test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))};
                let fee_estimator = test_utils::TestFeeEstimator { sat_per_kw: 253 };
 
                let mut config: UserConfig = Default::default();
@@ -4662,15 +4945,20 @@ pub mod bench {
 
                let dummy_graph = NetworkGraph::new(genesis_hash);
 
+               let mut payment_count: u64 = 0;
                macro_rules! send_payment {
                        ($node_a: expr, $node_b: expr) => {
                                let usable_channels = $node_a.list_usable_channels();
-                               let route = get_route(&$node_a.get_our_node_id(), &dummy_graph, &$node_b.get_our_node_id(), None, Some(&usable_channels.iter().map(|r| r).collect::<Vec<_>>()), &[], 10_000, TEST_FINAL_CLTV, &logger_a).unwrap();
+                               let route = get_route(&$node_a.get_our_node_id(), &dummy_graph, &$node_b.get_our_node_id(), Some(InvoiceFeatures::known()),
+                                       Some(&usable_channels.iter().map(|r| r).collect::<Vec<_>>()), &[], 10_000, TEST_FINAL_CLTV, &logger_a).unwrap();
 
-                               let payment_preimage = PaymentPreimage([0; 32]);
+                               let mut payment_preimage = PaymentPreimage([0; 32]);
+                               payment_preimage.0[0..8].copy_from_slice(&payment_count.to_le_bytes());
+                               payment_count += 1;
                                let payment_hash = PaymentHash(Sha256::hash(&payment_preimage.0[..]).into_inner());
+                               let payment_secret = $node_b.create_inbound_payment_for_hash(payment_hash, None, 7200, 0).unwrap();
 
-                               $node_a.send_payment(&route, payment_hash, &None).unwrap();
+                               $node_a.send_payment(&route, payment_hash, &Some(payment_secret)).unwrap();
                                let payment_event = SendEvent::from_event($node_a.get_and_clear_pending_msg_events().pop().unwrap());
                                $node_b.handle_update_add_htlc(&$node_a.get_our_node_id(), &payment_event.msgs[0]);
                                $node_b.handle_commitment_signed(&$node_a.get_our_node_id(), &payment_event.commitment_msg);
@@ -4680,8 +4968,8 @@ pub mod bench {
                                $node_b.handle_revoke_and_ack(&$node_a.get_our_node_id(), &get_event_msg!(NodeHolder { node: &$node_a }, MessageSendEvent::SendRevokeAndACK, $node_b.get_our_node_id()));
 
                                expect_pending_htlcs_forwardable!(NodeHolder { node: &$node_b });
-                               expect_payment_received!(NodeHolder { node: &$node_b }, payment_hash, 10_000);
-                               assert!($node_b.claim_funds(payment_preimage, &None, 10_000));
+                               expect_payment_received!(NodeHolder { node: &$node_b }, payment_hash, payment_secret, 10_000);
+                               assert!($node_b.claim_funds(payment_preimage));
 
                                match $node_b.get_and_clear_pending_msg_events().pop().unwrap() {
                                        MessageSendEvent::UpdateHTLCs { node_id, updates } => {