Merge pull request #1004 from TheBlueMatt/2021-07-forward-event
authorMatt Corallo <649246+TheBlueMatt@users.noreply.github.com>
Wed, 4 Aug 2021 22:58:14 +0000 (22:58 +0000)
committerGitHub <noreply@github.com>
Wed, 4 Aug 2021 22:58:14 +0000 (22:58 +0000)
Add a `PaymentForwarded` Event

1  2 
lightning/src/chain/channelmonitor.rs
lightning/src/ln/chanmon_update_fail_tests.rs
lightning/src/ln/channel.rs
lightning/src/ln/channelmanager.rs
lightning/src/ln/functional_test_utils.rs
lightning/src/ln/functional_tests.rs
lightning/src/ln/reorg_tests.rs
lightning/src/util/events.rs

index 6e2ac3fcf512ed3364fd19af3912bbf350de2c68,cec3f41c04ba5447eef797d6db9c0f220ab1b4c5..7904d9bdefa4935a68ab44e04722fe5d5a220dc2
@@@ -53,7 -53,7 +53,7 @@@ use util::events::Event
  
  use prelude::*;
  use core::{cmp, mem};
 -use std::io::Error;
 +use io::{self, Error};
  use core::ops::Deref;
  use sync::Mutex;
  
@@@ -88,7 -88,7 +88,7 @@@ pub struct ChannelMonitorUpdate 
  pub const CLOSED_CHANNEL_UPDATE_ID: u64 = core::u64::MAX;
  
  impl Writeable for ChannelMonitorUpdate {
 -      fn write<W: Writer>(&self, w: &mut W) -> Result<(), ::std::io::Error> {
 +      fn write<W: Writer>(&self, w: &mut W) -> Result<(), io::Error> {
                write_ver_prefix!(w, SERIALIZATION_VERSION, MIN_SERIALIZATION_VERSION);
                self.update_id.write(w)?;
                (self.updates.len() as u64).write(w)?;
        }
  }
  impl Readable for ChannelMonitorUpdate {
 -      fn read<R: ::std::io::Read>(r: &mut R) -> Result<Self, DecodeError> {
 +      fn read<R: io::Read>(r: &mut R) -> Result<Self, DecodeError> {
                let _ver = read_ver_prefix!(r, SERIALIZATION_VERSION);
                let update_id: u64 = Readable::read(r)?;
                let len: u64 = Readable::read(r)?;
@@@ -199,10 -199,12 +199,12 @@@ pub enum MonitorEvent 
  pub struct HTLCUpdate {
        pub(crate) payment_hash: PaymentHash,
        pub(crate) payment_preimage: Option<PaymentPreimage>,
-       pub(crate) source: HTLCSource
+       pub(crate) source: HTLCSource,
+       pub(crate) onchain_value_satoshis: Option<u64>,
  }
  impl_writeable_tlv_based!(HTLCUpdate, {
        (0, payment_hash, required),
+       (1, onchain_value_satoshis, option),
        (2, source, required),
        (4, payment_preimage, option),
  });
@@@ -293,7 -295,7 +295,7 @@@ struct CounterpartyCommitmentTransactio
  }
  
  impl Writeable for CounterpartyCommitmentTransaction {
 -      fn write<W: Writer>(&self, w: &mut W) -> Result<(), ::std::io::Error> {
 +      fn write<W: Writer>(&self, w: &mut W) -> Result<(), io::Error> {
                w.write_all(&byte_utils::be64_to_array(self.per_htlc.len() as u64))?;
                for (ref txid, ref htlcs) in self.per_htlc.iter() {
                        w.write_all(&txid[..])?;
        }
  }
  impl Readable for CounterpartyCommitmentTransaction {
 -      fn read<R: ::std::io::Read>(r: &mut R) -> Result<Self, DecodeError> {
 +      fn read<R: io::Read>(r: &mut R) -> Result<Self, DecodeError> {
                let counterparty_commitment_transaction = {
                        let per_htlc_len: u64 = Readable::read(r)?;
                        let mut per_htlc = HashMap::with_capacity(cmp::min(per_htlc_len as usize, MAX_ALLOC_SIZE / 64));
@@@ -385,6 -387,7 +387,7 @@@ enum OnchainEvent 
        HTLCUpdate {
                source: HTLCSource,
                payment_hash: PaymentHash,
+               onchain_value_satoshis: Option<u64>,
        },
        MaturingOutput {
                descriptor: SpendableOutputDescriptor,
@@@ -400,6 -403,7 +403,7 @@@ impl_writeable_tlv_based!(OnchainEventE
  impl_writeable_tlv_based_enum!(OnchainEvent,
        (0, HTLCUpdate) => {
                (0, source, required),
+               (1, onchain_value_satoshis, option),
                (2, payment_hash, required),
        },
        (1, MaturingOutput) => {
@@@ -1574,6 -1578,7 +1578,7 @@@ impl<Signer: Sign> ChannelMonitorImpl<S
                                                                                event: OnchainEvent::HTLCUpdate {
                                                                                        source: (**source).clone(),
                                                                                        payment_hash: htlc.payment_hash.clone(),
+                                                                                       onchain_value_satoshis: Some(htlc.amount_msat / 1000),
                                                                                },
                                                                        };
                                                                        log_info!(logger, "Failing HTLC with payment_hash {} from {} counterparty commitment tx due to broadcast of revoked counterparty commitment transaction, waiting for confirmation (at height {})", log_bytes!(htlc.payment_hash.0), $commitment_tx, entry.confirmation_threshold());
                                                                        event: OnchainEvent::HTLCUpdate {
                                                                                source: (**source).clone(),
                                                                                payment_hash: htlc.payment_hash.clone(),
+                                                                               onchain_value_satoshis: Some(htlc.amount_msat / 1000),
                                                                        },
                                                                });
                                                        }
                let mut claim_requests = Vec::new();
                let mut watch_outputs = Vec::new();
  
-               macro_rules! wait_threshold_conf {
-                       ($source: expr, $commitment_tx: expr, $payment_hash: expr) => {
-                               self.onchain_events_awaiting_threshold_conf.retain(|ref entry| {
-                                       if entry.height != height { return true; }
-                                       match entry.event {
-                                               OnchainEvent::HTLCUpdate { source: ref update_source, .. } => {
-                                                       *update_source != $source
-                                               },
-                                               _ => true,
-                                       }
-                               });
-                               let entry = OnchainEventEntry {
-                                       txid: commitment_txid,
-                                       height,
-                                       event: OnchainEvent::HTLCUpdate { source: $source, payment_hash: $payment_hash },
-                               };
-                               log_trace!(logger, "Failing HTLC with payment_hash {} from {} holder commitment tx due to broadcast of transaction, waiting confirmation (at height{})", log_bytes!($payment_hash.0), $commitment_tx, entry.confirmation_threshold());
-                               self.onchain_events_awaiting_threshold_conf.push(entry);
-                       }
-               }
                macro_rules! append_onchain_update {
                        ($updates: expr, $to_watch: expr) => {
                                claim_requests = $updates.0;
                }
  
                macro_rules! fail_dust_htlcs_after_threshold_conf {
-                       ($holder_tx: expr) => {
+                       ($holder_tx: expr, $commitment_tx: expr) => {
                                for &(ref htlc, _, ref source) in &$holder_tx.htlc_outputs {
                                        if htlc.transaction_output_index.is_none() {
                                                if let &Some(ref source) = source {
-                                                       wait_threshold_conf!(source.clone(), "lastest", htlc.payment_hash.clone());
+                                                       self.onchain_events_awaiting_threshold_conf.retain(|ref entry| {
+                                                               if entry.height != height { return true; }
+                                                               match entry.event {
+                                                                       OnchainEvent::HTLCUpdate { source: ref update_source, .. } => {
+                                                                               update_source != source
+                                                                       },
+                                                                       _ => true,
+                                                               }
+                                                       });
+                                                       let entry = OnchainEventEntry {
+                                                               txid: commitment_txid,
+                                                               height,
+                                                               event: OnchainEvent::HTLCUpdate {
+                                                                       source: source.clone(), payment_hash: htlc.payment_hash,
+                                                                       onchain_value_satoshis: Some(htlc.amount_msat / 1000)
+                                                               },
+                                                       };
+                                                       log_trace!(logger, "Failing HTLC with payment_hash {} from {} holder commitment tx due to broadcast of transaction, waiting confirmation (at height{})",
+                                                               log_bytes!(htlc.payment_hash.0), $commitment_tx, entry.confirmation_threshold());
+                                                       self.onchain_events_awaiting_threshold_conf.push(entry);
                                                }
                                        }
                                }
                }
  
                if is_holder_tx {
-                       fail_dust_htlcs_after_threshold_conf!(self.current_holder_commitment_tx);
+                       fail_dust_htlcs_after_threshold_conf!(self.current_holder_commitment_tx, "latest");
                        if let &Some(ref holder_tx) = &self.prev_holder_signed_commitment_tx {
-                               fail_dust_htlcs_after_threshold_conf!(holder_tx);
+                               fail_dust_htlcs_after_threshold_conf!(holder_tx, "previous");
                        }
                }
  
                // Produce actionable events from on-chain events having reached their threshold.
                for entry in onchain_events_reaching_threshold_conf.drain(..) {
                        match entry.event {
-                               OnchainEvent::HTLCUpdate { ref source, payment_hash } => {
+                               OnchainEvent::HTLCUpdate { ref source, payment_hash, onchain_value_satoshis } => {
                                        // Check for duplicate HTLC resolutions.
                                        #[cfg(debug_assertions)]
                                        {
  
                                        log_debug!(logger, "HTLC {} failure update has got enough confirmations to be passed upstream", log_bytes!(payment_hash.0));
                                        self.pending_monitor_events.push(MonitorEvent::HTLCEvent(HTLCUpdate {
-                                               payment_hash: payment_hash,
+                                               payment_hash,
                                                payment_preimage: None,
                                                source: source.clone(),
+                                               onchain_value_satoshis,
                                        }));
                                },
                                OnchainEvent::MaturingOutput { descriptor } => {
                                                        if pending_htlc.payment_hash == $htlc_output.payment_hash && pending_htlc.amount_msat == $htlc_output.amount_msat {
                                                                if let &Some(ref source) = pending_source {
                                                                        log_claim!("revoked counterparty commitment tx", false, pending_htlc, true);
-                                                                       payment_data = Some(((**source).clone(), $htlc_output.payment_hash));
+                                                                       payment_data = Some(((**source).clone(), $htlc_output.payment_hash, $htlc_output.amount_msat));
                                                                        break;
                                                                }
                                                        }
                                                                // transaction. This implies we either learned a preimage, the HTLC
                                                                // has timed out, or we screwed up. In any case, we should now
                                                                // resolve the source HTLC with the original sender.
-                                                               payment_data = Some(((*source).clone(), htlc_output.payment_hash));
+                                                               payment_data = Some(((*source).clone(), htlc_output.payment_hash, htlc_output.amount_msat));
                                                        } else if !$holder_tx {
                                                                        check_htlc_valid_counterparty!(self.current_counterparty_commitment_txid, htlc_output);
                                                                if payment_data.is_none() {
  
                        // Check that scan_commitment, above, decided there is some source worth relaying an
                        // HTLC resolution backwards to and figure out whether we learned a preimage from it.
-                       if let Some((source, payment_hash)) = payment_data {
+                       if let Some((source, payment_hash, amount_msat)) = payment_data {
                                let mut payment_preimage = PaymentPreimage([0; 32]);
                                if accepted_preimage_claim {
                                        if !self.pending_monitor_events.iter().any(
                                                self.pending_monitor_events.push(MonitorEvent::HTLCEvent(HTLCUpdate {
                                                        source,
                                                        payment_preimage: Some(payment_preimage),
-                                                       payment_hash
+                                                       payment_hash,
+                                                       onchain_value_satoshis: Some(amount_msat / 1000),
                                                }));
                                        }
                                } else if offered_preimage_claim {
                                                self.pending_monitor_events.push(MonitorEvent::HTLCEvent(HTLCUpdate {
                                                        source,
                                                        payment_preimage: Some(payment_preimage),
-                                                       payment_hash
+                                                       payment_hash,
+                                                       onchain_value_satoshis: Some(amount_msat / 1000),
                                                }));
                                        }
                                } else {
                                        let entry = OnchainEventEntry {
                                                txid: tx.txid(),
                                                height,
-                                               event: OnchainEvent::HTLCUpdate { source: source, payment_hash: payment_hash },
+                                               event: OnchainEvent::HTLCUpdate {
+                                                       source, payment_hash,
+                                                       onchain_value_satoshis: Some(amount_msat / 1000),
+                                               },
                                        };
                                        log_info!(logger, "Failing HTLC with payment_hash {} timeout by a spend tx, waiting for confirmation (at height {})", log_bytes!(payment_hash.0), entry.confirmation_threshold());
                                        self.onchain_events_awaiting_threshold_conf.push(entry);
                                        output: outp.clone(),
                                });
                                break;
 -                      } else if let Some(ref broadcasted_holder_revokable_script) = self.broadcasted_holder_revokable_script {
 +                      }
 +                      if let Some(ref broadcasted_holder_revokable_script) = self.broadcasted_holder_revokable_script {
                                if broadcasted_holder_revokable_script.0 == outp.script_pubkey {
                                        spendable_output =  Some(SpendableOutputDescriptor::DelayedPaymentOutput(DelayedPaymentOutputDescriptor {
                                                outpoint: OutPoint { txid: tx.txid(), index: i as u16 },
                                        }));
                                        break;
                                }
 -                      } else if self.counterparty_payment_script == outp.script_pubkey {
 +                      }
 +                      if self.counterparty_payment_script == outp.script_pubkey {
                                spendable_output = Some(SpendableOutputDescriptor::StaticPaymentOutput(StaticPaymentOutputDescriptor {
                                        outpoint: OutPoint { txid: tx.txid(), index: i as u16 },
                                        output: outp.clone(),
                                        channel_value_satoshis: self.channel_value_satoshis,
                                }));
                                break;
 -                      } else if outp.script_pubkey == self.shutdown_script {
 +                      }
 +                      if outp.script_pubkey == self.shutdown_script {
                                spendable_output = Some(SpendableOutputDescriptor::StaticOutput {
                                        outpoint: OutPoint { txid: tx.txid(), index: i as u16 },
                                        output: outp.clone(),
                                });
 +                              break;
                        }
                }
                if let Some(spendable_output) = spendable_output {
@@@ -2585,7 -2591,7 +2595,7 @@@ const MAX_ALLOC_SIZE: usize = 64*1024
  
  impl<'a, Signer: Sign, K: KeysInterface<Signer = Signer>> ReadableArgs<&'a K>
                for (BlockHash, ChannelMonitor<Signer>) {
 -      fn read<R: ::std::io::Read>(reader: &mut R, keys_manager: &'a K) -> Result<Self, DecodeError> {
 +      fn read<R: io::Read>(reader: &mut R, keys_manager: &'a K) -> Result<Self, DecodeError> {
                macro_rules! unwrap_obj {
                        ($key: expr) => {
                                match $key {
index 00b6813411c657fc65656a517b3dc9fc11e767d4,c3a2f5af3b539eb03cfa59cd34a870632f18c2f0..720190ec85523237bbc9a0e44e581b09b05586a7
@@@ -40,7 -40,6 +40,7 @@@ use ln::functional_test_utils::*
  
  use util::test_utils;
  
 +use io;
  use prelude::*;
  use sync::{Arc, Mutex};
  
@@@ -123,7 -122,7 +123,7 @@@ fn test_monitor_and_persister_update_fa
                let mut w = test_utils::TestVecWriter(Vec::new());
                monitor.write(&mut w).unwrap();
                let new_monitor = <(BlockHash, ChannelMonitor<EnforcingSigner>)>::read(
 -                      &mut ::std::io::Cursor::new(&w.0), &test_utils::OnlyReadsKeysInterface {}).unwrap().1;
 +                      &mut io::Cursor::new(&w.0), &test_utils::OnlyReadsKeysInterface {}).unwrap().1;
                assert!(new_monitor == *monitor);
                let chain_mon = test_utils::TestChainMonitor::new(Some(&chain_source), &tx_broadcaster, &logger, &chanmon_cfgs[0].fee_estimator, &persister, &node_cfgs[0].keys_manager);
                assert!(chain_mon.watch_channel(outpoint, new_monitor).is_ok());
@@@ -1160,6 -1159,7 +1160,7 @@@ fn test_monitor_update_fail_reestablish
        assert!(updates.update_fee.is_none());
        assert_eq!(updates.update_fulfill_htlcs.len(), 1);
        nodes[1].node.handle_update_fulfill_htlc(&nodes[2].node.get_our_node_id(), &updates.update_fulfill_htlcs[0]);
+       expect_payment_forwarded!(nodes[1], Some(1000), false);
        check_added_monitors!(nodes[1], 1);
        assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
        commitment_signed_dance!(nodes[1], nodes[2], updates.commitment_signed, false);
@@@ -2318,6 -2318,7 +2319,7 @@@ fn do_test_reconnect_dup_htlc_claims(ht
                assert_eq!(fulfill_msg, cs_updates.update_fulfill_htlcs[0]);
        }
        nodes[1].node.handle_update_fulfill_htlc(&nodes[2].node.get_our_node_id(), &fulfill_msg);
+       expect_payment_forwarded!(nodes[1], Some(1000), false);
        check_added_monitors!(nodes[1], 1);
  
        let mut bs_updates = None;
index 10641d0abbff75aeba310bcdffc1455db54dd18e,b9a63f1c3215b80dc4883f45e8b9763d457ed867..f80377b7d4a9247fd7e30dea1ee4930eda270ea9
@@@ -41,7 -41,6 +41,7 @@@ use util::errors::APIError
  use util::config::{UserConfig,ChannelConfig};
  use util::scid_utils::scid_from_parts;
  
 +use io;
  use prelude::*;
  use core::{cmp,mem,fmt};
  use core::ops::Deref;
@@@ -307,6 -306,7 +307,7 @@@ pub struct CounterpartyForwardingInfo 
  enum UpdateFulfillFetch {
        NewClaim {
                monitor_update: ChannelMonitorUpdate,
+               htlc_value_msat: u64,
                msg: Option<msgs::UpdateFulfillHTLC>,
        },
        DuplicateClaim {},
@@@ -320,6 -320,8 +321,8 @@@ pub enum UpdateFulfillCommitFetch 
        NewClaim {
                /// The ChannelMonitorUpdate which places the new payment preimage in the channel monitor
                monitor_update: ChannelMonitorUpdate,
+               /// The value of the HTLC which was claimed, in msat.
+               htlc_value_msat: u64,
                /// The update_fulfill message and commitment_signed message (if the claim was not placed
                /// in the holding cell).
                msgs: Option<(msgs::UpdateFulfillHTLC, msgs::CommitmentSigned)>,
  // Holder designates channel data owned for the benefice of the user client.
  // Counterparty designates channel data owned by the another channel participant entity.
  pub(super) struct Channel<Signer: Sign> {
+       #[cfg(any(test, feature = "_test_utils"))]
+       pub(crate) config: ChannelConfig,
+       #[cfg(not(any(test, feature = "_test_utils")))]
        config: ChannelConfig,
  
        user_id: u64,
@@@ -1276,6 -1281,7 +1282,7 @@@ impl<Signer: Sign> Channel<Signer> 
                // these, but for now we just have to treat them as normal.
  
                let mut pending_idx = core::usize::MAX;
+               let mut htlc_value_msat = 0;
                for (idx, htlc) in self.pending_inbound_htlcs.iter().enumerate() {
                        if htlc.htlc_id == htlc_id_arg {
                                assert_eq!(htlc.payment_hash, payment_hash_calc);
                                        }
                                }
                                pending_idx = idx;
+                               htlc_value_msat = htlc.amount_msat;
                                break;
                        }
                }
                                                        // TODO: We may actually be able to switch to a fulfill here, though its
                                                        // rare enough it may not be worth the complexity burden.
                                                        debug_assert!(false, "Tried to fulfill an HTLC that was already failed");
-                                                       return UpdateFulfillFetch::NewClaim { monitor_update, msg: None };
+                                                       return UpdateFulfillFetch::NewClaim { monitor_update, htlc_value_msat, msg: None };
                                                }
                                        },
                                        _ => {}
                        });
                        #[cfg(any(test, feature = "fuzztarget"))]
                        self.historical_inbound_htlc_fulfills.insert(htlc_id_arg);
-                       return UpdateFulfillFetch::NewClaim { monitor_update, msg: None };
+                       return UpdateFulfillFetch::NewClaim { monitor_update, htlc_value_msat, msg: None };
                }
                #[cfg(any(test, feature = "fuzztarget"))]
                self.historical_inbound_htlc_fulfills.insert(htlc_id_arg);
                        if let InboundHTLCState::Committed = htlc.state {
                        } else {
                                debug_assert!(false, "Have an inbound HTLC we tried to claim before it was fully committed to");
-                               return UpdateFulfillFetch::NewClaim { monitor_update, msg: None };
+                               return UpdateFulfillFetch::NewClaim { monitor_update, htlc_value_msat, msg: None };
                        }
                        log_trace!(logger, "Upgrading HTLC {} to LocalRemoved with a Fulfill in channel {}!", log_bytes!(htlc.payment_hash.0), log_bytes!(self.channel_id));
                        htlc.state = InboundHTLCState::LocalRemoved(InboundHTLCRemovalReason::Fulfill(payment_preimage_arg.clone()));
  
                UpdateFulfillFetch::NewClaim {
                        monitor_update,
+                       htlc_value_msat,
                        msg: Some(msgs::UpdateFulfillHTLC {
                                channel_id: self.channel_id(),
                                htlc_id: htlc_id_arg,
  
        pub fn get_update_fulfill_htlc_and_commit<L: Deref>(&mut self, htlc_id: u64, payment_preimage: PaymentPreimage, logger: &L) -> Result<UpdateFulfillCommitFetch, (ChannelError, ChannelMonitorUpdate)> where L::Target: Logger {
                match self.get_update_fulfill_htlc(htlc_id, payment_preimage, logger) {
-                       UpdateFulfillFetch::NewClaim { mut monitor_update, msg: Some(update_fulfill_htlc) } => {
+                       UpdateFulfillFetch::NewClaim { mut monitor_update, htlc_value_msat, msg: Some(update_fulfill_htlc) } => {
                                let (commitment, mut additional_update) = match self.send_commitment_no_status_check(logger) {
                                        Err(e) => return Err((e, monitor_update)),
                                        Ok(res) => res
                                // strictly increasing by one, so decrement it here.
                                self.latest_monitor_update_id = monitor_update.update_id;
                                monitor_update.updates.append(&mut additional_update.updates);
-                               Ok(UpdateFulfillCommitFetch::NewClaim { monitor_update, msgs: Some((update_fulfill_htlc, commitment)) })
+                               Ok(UpdateFulfillCommitFetch::NewClaim { monitor_update, htlc_value_msat, msgs: Some((update_fulfill_htlc, commitment)) })
                        },
-                       UpdateFulfillFetch::NewClaim { monitor_update, msg: None } => Ok(UpdateFulfillCommitFetch::NewClaim { monitor_update, msgs: None }),
+                       UpdateFulfillFetch::NewClaim { monitor_update, htlc_value_msat, msg: None } =>
+                               Ok(UpdateFulfillCommitFetch::NewClaim { monitor_update, htlc_value_msat, msgs: None }),
                        UpdateFulfillFetch::DuplicateClaim {} => Ok(UpdateFulfillCommitFetch::DuplicateClaim {}),
                }
        }
  
        /// Marks an outbound HTLC which we have received update_fail/fulfill/malformed
        #[inline]
-       fn mark_outbound_htlc_removed(&mut self, htlc_id: u64, check_preimage: Option<PaymentHash>, fail_reason: Option<HTLCFailReason>) -> Result<&HTLCSource, ChannelError> {
+       fn mark_outbound_htlc_removed(&mut self, htlc_id: u64, check_preimage: Option<PaymentHash>, fail_reason: Option<HTLCFailReason>) -> Result<&OutboundHTLCOutput, ChannelError> {
                for htlc in self.pending_outbound_htlcs.iter_mut() {
                        if htlc.htlc_id == htlc_id {
                                match check_preimage {
                                        OutboundHTLCState::AwaitingRemoteRevokeToRemove(_) | OutboundHTLCState::AwaitingRemovedRemoteRevoke(_) | OutboundHTLCState::RemoteRemoved(_) =>
                                                return Err(ChannelError::Close(format!("Remote tried to fulfill/fail HTLC ({}) that they'd already fulfilled/failed", htlc_id))),
                                }
-                               return Ok(&htlc.source);
+                               return Ok(htlc);
                        }
                }
                Err(ChannelError::Close("Remote tried to fulfill/fail an HTLC we couldn't find".to_owned()))
        }
  
-       pub fn update_fulfill_htlc(&mut self, msg: &msgs::UpdateFulfillHTLC) -> Result<HTLCSource, ChannelError> {
+       pub fn update_fulfill_htlc(&mut self, msg: &msgs::UpdateFulfillHTLC) -> Result<(HTLCSource, u64), ChannelError> {
                if (self.channel_state & (ChannelState::ChannelFunded as u32)) != (ChannelState::ChannelFunded as u32) {
                        return Err(ChannelError::Close("Got fulfill HTLC message when channel was not in an operational state".to_owned()));
                }
                }
  
                let payment_hash = PaymentHash(Sha256::hash(&msg.payment_preimage.0[..]).into_inner());
-               self.mark_outbound_htlc_removed(msg.htlc_id, Some(payment_hash), None).map(|source| source.clone())
+               self.mark_outbound_htlc_removed(msg.htlc_id, Some(payment_hash), None).map(|htlc| (htlc.source.clone(), htlc.amount_msat))
        }
  
        pub fn update_fail_htlc(&mut self, msg: &msgs::UpdateFailHTLC, fail_reason: HTLCFailReason) -> Result<(), ChannelError> {
                                                // in it hitting the holding cell again and we cannot change the state of a
                                                // holding cell HTLC from fulfill to anything else.
                                                let (update_fulfill_msg_option, mut additional_monitor_update) =
-                                                       if let UpdateFulfillFetch::NewClaim { msg, monitor_update } = self.get_update_fulfill_htlc(htlc_id, *payment_preimage, logger) {
+                                                       if let UpdateFulfillFetch::NewClaim { msg, monitor_update, .. } = self.get_update_fulfill_htlc(htlc_id, *payment_preimage, logger) {
                                                                (msg, monitor_update)
                                                        } else { unreachable!() };
                                                update_fulfill_htlcs.push(update_fulfill_msg_option.unwrap());
@@@ -4513,7 -4522,7 +4523,7 @@@ impl_writeable_tlv_based_enum!(InboundH
  );
  
  impl Writeable for ChannelUpdateStatus {
 -      fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ::std::io::Error> {
 +      fn write<W: Writer>(&self, writer: &mut W) -> Result<(), io::Error> {
                // We only care about writing out the current state as it was announced, ie only either
                // Enabled or Disabled. In the case of DisabledStaged, we most recently announced the
                // channel as enabled, so we write 0. For EnabledStaged, we similarly write a 1.
  }
  
  impl Readable for ChannelUpdateStatus {
 -      fn read<R: ::std::io::Read>(reader: &mut R) -> Result<Self, DecodeError> {
 +      fn read<R: io::Read>(reader: &mut R) -> Result<Self, DecodeError> {
                Ok(match <u8 as Readable>::read(reader)? {
                        0 => ChannelUpdateStatus::Enabled,
                        1 => ChannelUpdateStatus::Disabled,
  }
  
  impl<Signer: Sign> Writeable for Channel<Signer> {
 -      fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ::std::io::Error> {
 +      fn write<W: Writer>(&self, writer: &mut W) -> Result<(), io::Error> {
                // Note that we write out as if remove_uncommitted_htlcs_and_mark_paused had just been
                // called.
  
  const MAX_ALLOC_SIZE: usize = 64*1024;
  impl<'a, Signer: Sign, K: Deref> ReadableArgs<&'a K> for Channel<Signer>
                where K::Target: KeysInterface<Signer = Signer> {
 -      fn read<R : ::std::io::Read>(reader: &mut R, keys_source: &'a K) -> Result<Self, DecodeError> {
 +      fn read<R : io::Read>(reader: &mut R, keys_source: &'a K) -> Result<Self, DecodeError> {
                let ver = read_ver_prefix!(reader, SERIALIZATION_VERSION);
  
                let user_id = Readable::read(reader)?;
index 7e42705c226344fe9dad5f0830e27e3821700445,ad9a7e2955494cd53a95a63542a22d42539c7cc7..afbfd0ee970da43bc050c488281cf16da4c4e8c7
@@@ -60,11 -60,10 +60,11 @@@ use util::chacha20::{ChaCha20, ChaChaRe
  use util::logger::{Logger, Level};
  use util::errors::APIError;
  
 +use io;
  use prelude::*;
  use core::{cmp, mem};
  use core::cell::RefCell;
 -use std::io::{Cursor, Read};
 +use io::{Cursor, Read};
  use sync::{Arc, Condvar, Mutex, MutexGuard, RwLock, RwLockReadGuard};
  use core::sync::atomic::{AtomicUsize, Ordering};
  use core::time::Duration;
@@@ -208,6 -207,14 +208,14 @@@ pub(super) enum HTLCFailReason 
        }
  }
  
+ /// Return value for claim_funds_from_hop
+ enum ClaimFundsFromHop {
+       PrevHopForceClosed,
+       MonitorUpdateFail(PublicKey, MsgHandleErrInternal, Option<u64>),
+       Success(u64),
+       DuplicateClaim,
+ }
  type ShutdownResult = (Option<(OutPoint, ChannelMonitorUpdate)>, Vec<(HTLCSource, PaymentHash)>);
  
  /// Error type returned across the channel_state mutex boundary. When an Err is generated for a
@@@ -2788,16 -2795,22 +2796,22 @@@ impl<Signer: Sign, M: Deref, T: Deref, 
                                                                         HTLCFailReason::Reason { failure_code: 0x4000|15, data: htlc_msat_height_data });
                                } else {
                                        match self.claim_funds_from_hop(channel_state.as_mut().unwrap(), htlc.prev_hop, payment_preimage) {
-                                               Err(Some(e)) => {
-                                                       if let msgs::ErrorAction::IgnoreError = e.1.err.action {
+                                               ClaimFundsFromHop::MonitorUpdateFail(pk, err, _) => {
+                                                       if let msgs::ErrorAction::IgnoreError = err.err.action {
                                                                // We got a temporary failure updating monitor, but will claim the
                                                                // HTLC when the monitor updating is restored (or on chain).
-                                                               log_error!(self.logger, "Temporary failure claiming HTLC, treating as success: {}", e.1.err.err);
+                                                               log_error!(self.logger, "Temporary failure claiming HTLC, treating as success: {}", err.err.err);
                                                                claimed_any_htlcs = true;
-                                                       } else { errs.push(e); }
+                                                       } else { errs.push((pk, err)); }
+                                               },
+                                               ClaimFundsFromHop::PrevHopForceClosed => unreachable!("We already checked for channel existence, we can't fail here!"),
+                                               ClaimFundsFromHop::DuplicateClaim => {
+                                                       // While we should never get here in most cases, if we do, it likely
+                                                       // indicates that the HTLC was timed out some time ago and is no longer
+                                                       // available to be claimed. Thus, it does not make sense to set
+                                                       // `claimed_any_htlcs`.
                                                },
-                                               Err(None) => unreachable!("We already checked for channel existence, we can't fail here!"),
-                                               Ok(()) => claimed_any_htlcs = true,
+                                               ClaimFundsFromHop::Success(_) => claimed_any_htlcs = true,
                                        }
                                }
                        }
                } else { false }
        }
  
-       fn claim_funds_from_hop(&self, channel_state_lock: &mut MutexGuard<ChannelHolder<Signer>>, prev_hop: HTLCPreviousHopData, payment_preimage: PaymentPreimage) -> Result<(), Option<(PublicKey, MsgHandleErrInternal)>> {
+       fn claim_funds_from_hop(&self, channel_state_lock: &mut MutexGuard<ChannelHolder<Signer>>, prev_hop: HTLCPreviousHopData, payment_preimage: PaymentPreimage) -> ClaimFundsFromHop {
                //TODO: Delay the claimed_funds relaying just like we do outbound relay!
                let channel_state = &mut **channel_state_lock;
                let chan_id = match channel_state.short_to_id.get(&prev_hop.short_channel_id) {
                        Some(chan_id) => chan_id.clone(),
                        None => {
-                               return Err(None)
+                               return ClaimFundsFromHop::PrevHopForceClosed
                        }
                };
  
                if let hash_map::Entry::Occupied(mut chan) = channel_state.by_id.entry(chan_id) {
                        match chan.get_mut().get_update_fulfill_htlc_and_commit(prev_hop.htlc_id, payment_preimage, &self.logger) {
                                Ok(msgs_monitor_option) => {
-                                       if let UpdateFulfillCommitFetch::NewClaim { msgs, monitor_update } = msgs_monitor_option {
+                                       if let UpdateFulfillCommitFetch::NewClaim { msgs, htlc_value_msat, monitor_update } = msgs_monitor_option {
                                                if let Err(e) = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), monitor_update) {
                                                        log_given_level!(self.logger, if e == ChannelMonitorUpdateErr::PermanentFailure { Level::Error } else { Level::Debug },
                                                                "Failed to update channel monitor with preimage {:?}: {:?}",
                                                                payment_preimage, e);
-                                                       return Err(Some((
+                                                       return ClaimFundsFromHop::MonitorUpdateFail(
                                                                chan.get().get_counterparty_node_id(),
                                                                handle_monitor_err!(self, e, channel_state, chan, RAACommitmentOrder::CommitmentFirst, false, msgs.is_some()).unwrap_err(),
-                                                       )));
+                                                               Some(htlc_value_msat)
+                                                       );
                                                }
                                                if let Some((msg, commitment_signed)) = msgs {
                                                        log_debug!(self.logger, "Claiming funds for HTLC with preimage {} resulted in a commitment_signed for channel {}",
                                                                }
                                                        });
                                                }
+                                               return ClaimFundsFromHop::Success(htlc_value_msat);
+                                       } else {
+                                               return ClaimFundsFromHop::DuplicateClaim;
                                        }
-                                       return Ok(())
                                },
                                Err((e, monitor_update)) => {
                                        if let Err(e) = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), monitor_update) {
                                        if drop {
                                                chan.remove_entry();
                                        }
-                                       return Err(Some((counterparty_node_id, res)));
+                                       return ClaimFundsFromHop::MonitorUpdateFail(counterparty_node_id, res, None);
                                },
                        }
                } else { unreachable!(); }
        }
  
-       fn claim_funds_internal(&self, mut channel_state_lock: MutexGuard<ChannelHolder<Signer>>, source: HTLCSource, payment_preimage: PaymentPreimage) {
+       fn claim_funds_internal(&self, mut channel_state_lock: MutexGuard<ChannelHolder<Signer>>, source: HTLCSource, payment_preimage: PaymentPreimage, forwarded_htlc_value_msat: Option<u64>, from_onchain: bool) {
                match source {
                        HTLCSource::OutboundRoute { session_priv, .. } => {
                                mem::drop(channel_state_lock);
                        },
                        HTLCSource::PreviousHopData(hop_data) => {
                                let prev_outpoint = hop_data.outpoint;
-                               if let Err((counterparty_node_id, err)) = match self.claim_funds_from_hop(&mut channel_state_lock, hop_data, payment_preimage) {
-                                       Ok(()) => Ok(()),
-                                       Err(None) => {
-                                               let preimage_update = ChannelMonitorUpdate {
-                                                       update_id: CLOSED_CHANNEL_UPDATE_ID,
-                                                       updates: vec![ChannelMonitorUpdateStep::PaymentPreimage {
-                                                               payment_preimage: payment_preimage.clone(),
-                                                       }],
-                                               };
-                                               // We update the ChannelMonitor on the backward link, after
-                                               // receiving an offchain preimage event from the forward link (the
-                                               // event being update_fulfill_htlc).
-                                               if let Err(e) = self.chain_monitor.update_channel(prev_outpoint, preimage_update) {
-                                                       log_error!(self.logger, "Critical error: failed to update channel monitor with preimage {:?}: {:?}",
-                                                                  payment_preimage, e);
-                                               }
-                                               Ok(())
-                                       },
-                                       Err(Some(res)) => Err(res),
-                               } {
-                                       mem::drop(channel_state_lock);
-                                       let res: Result<(), _> = Err(err);
-                                       let _ = handle_error!(self, res, counterparty_node_id);
+                               let res = self.claim_funds_from_hop(&mut channel_state_lock, hop_data, payment_preimage);
+                               let claimed_htlc = if let ClaimFundsFromHop::DuplicateClaim = res { false } else { true };
+                               let htlc_claim_value_msat = match res {
+                                       ClaimFundsFromHop::MonitorUpdateFail(_, _, amt_opt) => amt_opt,
+                                       ClaimFundsFromHop::Success(amt) => Some(amt),
+                                       _ => None,
+                               };
+                               if let ClaimFundsFromHop::PrevHopForceClosed = res {
+                                       let preimage_update = ChannelMonitorUpdate {
+                                               update_id: CLOSED_CHANNEL_UPDATE_ID,
+                                               updates: vec![ChannelMonitorUpdateStep::PaymentPreimage {
+                                                       payment_preimage: payment_preimage.clone(),
+                                               }],
+                                       };
+                                       // We update the ChannelMonitor on the backward link, after
+                                       // receiving an offchain preimage event from the forward link (the
+                                       // event being update_fulfill_htlc).
+                                       if let Err(e) = self.chain_monitor.update_channel(prev_outpoint, preimage_update) {
+                                               log_error!(self.logger, "Critical error: failed to update channel monitor with preimage {:?}: {:?}",
+                                                                                        payment_preimage, e);
+                                       }
+                                       // Note that we do *not* set `claimed_htlc` to false here. In fact, this
+                                       // totally could be a duplicate claim, but we have no way of knowing
+                                       // without interrogating the `ChannelMonitor` we've provided the above
+                                       // update to. Instead, we simply document in `PaymentForwarded` that this
+                                       // can happen.
+                               }
+                               mem::drop(channel_state_lock);
+                               if let ClaimFundsFromHop::MonitorUpdateFail(pk, err, _) = res {
+                                       let result: Result<(), _> = Err(err);
+                                       let _ = handle_error!(self, result, pk);
+                               }
+                               if claimed_htlc {
+                                       if let Some(forwarded_htlc_value) = forwarded_htlc_value_msat {
+                                               let fee_earned_msat = if let Some(claimed_htlc_value) = htlc_claim_value_msat {
+                                                       Some(claimed_htlc_value - forwarded_htlc_value)
+                                               } else { None };
+                                               let mut pending_events = self.pending_events.lock().unwrap();
+                                               pending_events.push(events::Event::PaymentForwarded {
+                                                       fee_earned_msat,
+                                                       claim_from_onchain_tx: from_onchain,
+                                               });
+                                       }
                                }
                        },
                }
  
        fn internal_update_fulfill_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFulfillHTLC) -> Result<(), MsgHandleErrInternal> {
                let mut channel_lock = self.channel_state.lock().unwrap();
-               let htlc_source = {
+               let (htlc_source, forwarded_htlc_value) = {
                        let channel_state = &mut *channel_lock;
                        match channel_state.by_id.entry(msg.channel_id) {
                                hash_map::Entry::Occupied(mut chan) => {
                                hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id))
                        }
                };
-               self.claim_funds_internal(channel_lock, htlc_source, msg.payment_preimage.clone());
+               self.claim_funds_internal(channel_lock, htlc_source, msg.payment_preimage.clone(), Some(forwarded_htlc_value), false);
                Ok(())
        }
  
        /// Process pending events from the `chain::Watch`, returning whether any events were processed.
        fn process_pending_monitor_events(&self) -> bool {
                let mut failed_channels = Vec::new();
-               let pending_monitor_events = self.chain_monitor.release_pending_monitor_events();
+               let mut pending_monitor_events = self.chain_monitor.release_pending_monitor_events();
                let has_pending_monitor_events = !pending_monitor_events.is_empty();
-               for monitor_event in pending_monitor_events {
+               for monitor_event in pending_monitor_events.drain(..) {
                        match monitor_event {
                                MonitorEvent::HTLCEvent(htlc_update) => {
                                        if let Some(preimage) = htlc_update.payment_preimage {
                                                log_trace!(self.logger, "Claiming HTLC with preimage {} from our monitor", log_bytes!(preimage.0));
-                                               self.claim_funds_internal(self.channel_state.lock().unwrap(), htlc_update.source, preimage);
+                                               self.claim_funds_internal(self.channel_state.lock().unwrap(), htlc_update.source, preimage, htlc_update.onchain_value_satoshis.map(|v| v * 1000), true);
                                        } else {
                                                log_trace!(self.logger, "Failing HTLC with hash {} from our monitor", log_bytes!(htlc_update.payment_hash.0));
                                                self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_update.source, &htlc_update.payment_hash, HTLCFailReason::Reason { failure_code: 0x4000 | 8, data: Vec::new() });
@@@ -3991,7 -4029,7 +4030,7 @@@ wher
                                result = NotifyOption::DoPersist;
                        }
  
 -                      let mut pending_events = std::mem::replace(&mut *self.pending_events.lock().unwrap(), vec![]);
 +                      let mut pending_events = mem::replace(&mut *self.pending_events.lock().unwrap(), vec![]);
                        if !pending_events.is_empty() {
                                result = NotifyOption::DoPersist;
                        }
@@@ -4611,7 -4649,7 +4650,7 @@@ impl_writeable_tlv_based!(HTLCPreviousH
  });
  
  impl Writeable for ClaimableHTLC {
 -      fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ::std::io::Error> {
 +      fn write<W: Writer>(&self, writer: &mut W) -> Result<(), io::Error> {
                let payment_data = match &self.onion_payload {
                        OnionPayload::Invoice(data) => Some(data.clone()),
                        _ => None,
@@@ -4715,7 -4753,7 +4754,7 @@@ impl<Signer: Sign, M: Deref, T: Deref, 
          F::Target: FeeEstimator,
          L::Target: Logger,
  {
 -      fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ::std::io::Error> {
 +      fn write<W: Writer>(&self, writer: &mut W) -> Result<(), io::Error> {
                let _consistency_lock = self.total_consistency_lock.write().unwrap();
  
                write_ver_prefix!(writer, SERIALIZATION_VERSION, MIN_SERIALIZATION_VERSION);
@@@ -4911,7 -4949,7 +4950,7 @@@ impl<'a, Signer: Sign, M: Deref, T: Der
          F::Target: FeeEstimator,
          L::Target: Logger,
  {
 -      fn read<R: ::std::io::Read>(reader: &mut R, args: ChannelManagerReadArgs<'a, Signer, M, T, K, F, L>) -> Result<Self, DecodeError> {
 +      fn read<R: io::Read>(reader: &mut R, args: ChannelManagerReadArgs<'a, Signer, M, T, K, F, L>) -> Result<Self, DecodeError> {
                let (blockhash, chan_manager) = <(BlockHash, ChannelManager<Signer, M, T, K, F, L>)>::read(reader, args)?;
                Ok((blockhash, Arc::new(chan_manager)))
        }
@@@ -4925,7 -4963,7 +4964,7 @@@ impl<'a, Signer: Sign, M: Deref, T: Der
          F::Target: FeeEstimator,
          L::Target: Logger,
  {
 -      fn read<R: ::std::io::Read>(reader: &mut R, mut args: ChannelManagerReadArgs<'a, Signer, M, T, K, F, L>) -> Result<Self, DecodeError> {
 +      fn read<R: io::Read>(reader: &mut R, mut args: ChannelManagerReadArgs<'a, Signer, M, T, K, F, L>) -> Result<Self, DecodeError> {
                let _ver = read_ver_prefix!(reader, SERIALIZATION_VERSION);
  
                let genesis_hash: BlockHash = Readable::read(reader)?;
  mod tests {
        use bitcoin::hashes::Hash;
        use bitcoin::hashes::sha256::Hash as Sha256;
 -      use core::sync::atomic::{AtomicBool, Ordering};
        use core::time::Duration;
        use ln::{PaymentPreimage, PaymentHash, PaymentSecret};
 -      use ln::channelmanager::PersistenceNotifier;
        use ln::features::{InitFeatures, InvoiceFeatures};
        use ln::functional_test_utils::*;
        use ln::msgs;
        use routing::router::{get_keysend_route, get_route};
        use util::events::{Event, MessageSendEvent, MessageSendEventsProvider};
        use util::test_utils;
 -      use std::sync::Arc;
 -      use std::thread;
  
        #[cfg(feature = "std")]
        #[test]
        fn test_wait_timeout() {
 +              use ln::channelmanager::PersistenceNotifier;
 +              use sync::Arc;
 +              use core::sync::atomic::{AtomicBool, Ordering};
 +              use std::thread;
 +
                let persistence_notifier = Arc::new(PersistenceNotifier::new());
                let thread_notifier = Arc::clone(&persistence_notifier);
  
                let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]);
                let nodes = create_network(3, &node_cfgs, &node_chanmgrs);
  
 +              // All nodes start with a persistable update pending as `create_network` connects each node
 +              // with all other nodes to make most tests simpler.
 +              assert!(nodes[0].node.await_persistable_update_timeout(Duration::from_millis(1)));
 +              assert!(nodes[1].node.await_persistable_update_timeout(Duration::from_millis(1)));
 +              assert!(nodes[2].node.await_persistable_update_timeout(Duration::from_millis(1)));
 +
                let mut chan = create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::known(), InitFeatures::known());
  
                // We check that the channel info nodes have doesn't change too early, even though we try
index feeb691b1ceb9b0029d82c42e1b994285c65002e,bea2990ccb560a35a6d9fba06b52073f02731bcf..807bb20f7ece8f460facfa7dc8d955f4dff7c642
@@@ -39,7 -39,6 +39,7 @@@ use bitcoin::hash_types::BlockHash
  
  use bitcoin::secp256k1::key::PublicKey;
  
 +use io;
  use prelude::*;
  use core::cell::RefCell;
  use std::rc::Rc;
@@@ -240,7 -239,7 +240,7 @@@ impl<'a, 'b, 'c> Drop for Node<'a, 'b, 
                                let mut w = test_utils::TestVecWriter(Vec::new());
                                let network_graph_ser = self.net_graph_msg_handler.network_graph.read().unwrap();
                                network_graph_ser.write(&mut w).unwrap();
 -                              let network_graph_deser = <NetworkGraph>::read(&mut ::std::io::Cursor::new(&w.0)).unwrap();
 +                              let network_graph_deser = <NetworkGraph>::read(&mut io::Cursor::new(&w.0)).unwrap();
                                assert!(network_graph_deser == *self.net_graph_msg_handler.network_graph.read().unwrap());
                                let net_graph_msg_handler = NetGraphMsgHandler::from_net_graph(
                                        Some(self.chain_source), self.logger, network_graph_deser
                                        let mut w = test_utils::TestVecWriter(Vec::new());
                                        old_monitor.write(&mut w).unwrap();
                                        let (_, deserialized_monitor) = <(BlockHash, ChannelMonitor<EnforcingSigner>)>::read(
 -                                              &mut ::std::io::Cursor::new(&w.0), self.keys_manager).unwrap();
 +                                              &mut io::Cursor::new(&w.0), self.keys_manager).unwrap();
                                        deserialized_monitors.push(deserialized_monitor);
                                }
                        }
  
                                let mut w = test_utils::TestVecWriter(Vec::new());
                                self.node.write(&mut w).unwrap();
 -                              <(BlockHash, ChannelManager<EnforcingSigner, &test_utils::TestChainMonitor, &test_utils::TestBroadcaster, &test_utils::TestKeysInterface, &test_utils::TestFeeEstimator, &test_utils::TestLogger>)>::read(&mut ::std::io::Cursor::new(w.0), ChannelManagerReadArgs {
 +                              <(BlockHash, ChannelManager<EnforcingSigner, &test_utils::TestChainMonitor, &test_utils::TestBroadcaster, &test_utils::TestKeysInterface, &test_utils::TestFeeEstimator, &test_utils::TestLogger>)>::read(&mut io::Cursor::new(w.0), ChannelManagerReadArgs {
                                        default_config: *self.node.get_current_default_configuration(),
                                        keys_manager: self.keys_manager,
                                        fee_estimator: &test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) },
@@@ -1006,6 -1005,20 +1006,20 @@@ macro_rules! expect_payment_sent 
        }
  }
  
+ macro_rules! expect_payment_forwarded {
+       ($node: expr, $expected_fee: expr, $upstream_force_closed: expr) => {
+               let events = $node.node.get_and_clear_pending_events();
+               assert_eq!(events.len(), 1);
+               match events[0] {
+                       Event::PaymentForwarded { fee_earned_msat, claim_from_onchain_tx } => {
+                               assert_eq!(fee_earned_msat, $expected_fee);
+                               assert_eq!(claim_from_onchain_tx, $upstream_force_closed);
+                       },
+                       _ => panic!("Unexpected event"),
+               }
+       }
+ }
  #[cfg(test)]
  macro_rules! expect_payment_failure_chan_update {
        ($node: expr, $scid: expr, $chan_closed: expr) => {
@@@ -1170,6 -1183,8 +1184,8 @@@ pub fn claim_payment_along_route<'a, 'b
                        ($node: expr, $prev_node: expr, $new_msgs: expr) => {
                                {
                                        $node.node.handle_update_fulfill_htlc(&$prev_node.node.get_our_node_id(), &next_msgs.as_ref().unwrap().0);
+                                       let fee = $node.node.channel_state.lock().unwrap().by_id.get(&next_msgs.as_ref().unwrap().0.channel_id).unwrap().config.forwarding_fee_base_msat;
+                                       expect_payment_forwarded!($node, Some(fee as u64), false);
                                        check_added_monitors!($node, 1);
                                        let new_next_msgs = if $new_msgs {
                                                let events = $node.node.get_and_clear_pending_msg_events();
@@@ -1405,13 -1420,6 +1421,13 @@@ pub fn create_network<'a, 'b: 'a, 'c: '
                })
        }
  
 +      for i in 0..node_count {
 +              for j in (i+1)..node_count {
 +                      nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &msgs::Init { features: InitFeatures::known() });
 +                      nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &msgs::Init { features: InitFeatures::known() });
 +              }
 +      }
 +
        nodes
  }
  
index 7e9eb4c96a993755d2bc814987506c1cbf1b3b97,6dec20b2afbde201426241427524e42131fbe8bd..6e5395e65c651440946f22e92a8ada537c9300ee
@@@ -51,7 -51,6 +51,7 @@@ use bitcoin::secp256k1::key::{PublicKey
  
  use regex;
  
 +use io;
  use prelude::*;
  use alloc::collections::BTreeSet;
  use core::default::Default;
@@@ -887,6 -886,7 +887,7 @@@ fn updates_shutdown_wait() 
        assert!(updates.update_fee.is_none());
        assert_eq!(updates.update_fulfill_htlcs.len(), 1);
        nodes[1].node.handle_update_fulfill_htlc(&nodes[2].node.get_our_node_id(), &updates.update_fulfill_htlcs[0]);
+       expect_payment_forwarded!(nodes[1], Some(1000), false);
        check_added_monitors!(nodes[1], 1);
        let updates_2 = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id());
        commitment_signed_dance!(nodes[1], nodes[2], updates.commitment_signed, false);
@@@ -1061,6 -1061,7 +1062,7 @@@ fn do_test_shutdown_rebroadcast(recv_co
        assert!(updates.update_fee.is_none());
        assert_eq!(updates.update_fulfill_htlcs.len(), 1);
        nodes[1].node.handle_update_fulfill_htlc(&nodes[2].node.get_our_node_id(), &updates.update_fulfill_htlcs[0]);
+       expect_payment_forwarded!(nodes[1], Some(1000), false);
        check_added_monitors!(nodes[1], 1);
        let updates_2 = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id());
        commitment_signed_dance!(nodes[1], nodes[2], updates.commitment_signed, false);
@@@ -2833,6 -2834,12 +2835,12 @@@ fn test_htlc_on_chain_success() 
                assert_eq!(added_monitors[0].0.txid, chan_2.3.txid());
                added_monitors.clear();
        }
+       let forwarded_events = nodes[1].node.get_and_clear_pending_events();
+       assert_eq!(forwarded_events.len(), 2);
+       if let Event::PaymentForwarded { fee_earned_msat: Some(1000), claim_from_onchain_tx: true } = forwarded_events[0] {
+               } else { panic!(); }
+       if let Event::PaymentForwarded { fee_earned_msat: Some(1000), claim_from_onchain_tx: true } = forwarded_events[1] {
+               } else { panic!(); }
        let events = nodes[1].node.get_and_clear_pending_msg_events();
        {
                let mut added_monitors = nodes[1].chain_monitor.added_monitors.lock().unwrap();
@@@ -4550,7 -4557,7 +4558,7 @@@ fn test_dup_htlc_onchain_fails_on_reloa
                let mut channel_monitors = HashMap::new();
                channel_monitors.insert(chan_0_monitor.get_funding_txo().0, &mut chan_0_monitor);
                <(BlockHash, ChannelManager<EnforcingSigner, &test_utils::TestChainMonitor, &test_utils::TestBroadcaster, &test_utils::TestKeysInterface, &test_utils::TestFeeEstimator, &test_utils::TestLogger>)>
 -                      ::read(&mut std::io::Cursor::new(&chan_manager_serialized.0[..]), ChannelManagerReadArgs {
 +                      ::read(&mut io::Cursor::new(&chan_manager_serialized.0[..]), ChannelManagerReadArgs {
                                default_config: Default::default(),
                                keys_manager,
                                fee_estimator: node_cfgs[0].fee_estimator,
@@@ -5330,19 -5337,15 +5338,15 @@@ fn test_onchain_to_onchain_claim() 
        // So we broadcast C's commitment tx and HTLC-Success on B's chain, we should successfully be able to extract preimage and update downstream monitor
        let header = BlockHeader { version: 0x20000000, prev_blockhash: nodes[1].best_block_hash(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42};
        connect_block(&nodes[1], &Block { header, txdata: vec![c_txn[1].clone(), c_txn[2].clone()]});
-       connect_blocks(&nodes[1], TEST_FINAL_CLTV - 1); // Confirm blocks until the HTLC expires
+       check_added_monitors!(nodes[1], 1);
+       expect_payment_forwarded!(nodes[1], Some(1000), true);
        {
                let mut b_txn = nodes[1].tx_broadcaster.txn_broadcasted.lock().unwrap();
-               // ChannelMonitor: claim tx, ChannelManager: local commitment tx
-               assert_eq!(b_txn.len(), 2);
+               // ChannelMonitor: claim tx
+               assert_eq!(b_txn.len(), 1);
                check_spends!(b_txn[0], chan_2.3); // B local commitment tx, issued by ChannelManager
-               check_spends!(b_txn[1], c_txn[1]); // timeout tx on C remote commitment tx, issued by ChannelMonitor
-               assert_eq!(b_txn[1].input[0].witness.clone().last().unwrap().len(), ACCEPTED_HTLC_SCRIPT_WEIGHT);
-               assert!(b_txn[1].output[0].script_pubkey.is_v0_p2wpkh()); // direct payment
-               assert_ne!(b_txn[1].lock_time, 0); // Timeout tx
                b_txn.clear();
        }
-       check_added_monitors!(nodes[1], 1);
        let msg_events = nodes[1].node.get_and_clear_pending_msg_events();
        assert_eq!(msg_events.len(), 3);
        check_added_monitors!(nodes[1], 1);
        let commitment_tx = get_local_commitment_txn!(nodes[0], chan_1.2);
        mine_transaction(&nodes[1], &commitment_tx[0]);
        let b_txn = nodes[1].tx_broadcaster.txn_broadcasted.lock().unwrap();
-       // ChannelMonitor: HTLC-Success tx + HTLC-Timeout RBF Bump, ChannelManager: local commitment tx + HTLC-Success tx
-       assert_eq!(b_txn.len(), 4);
-       check_spends!(b_txn[2], chan_1.3);
-       check_spends!(b_txn[3], b_txn[2]);
-       let (htlc_success_claim, htlc_timeout_bumped) =
-               if b_txn[0].input[0].previous_output.txid == commitment_tx[0].txid()
-                       { (&b_txn[0], &b_txn[1]) } else { (&b_txn[1], &b_txn[0]) };
-       check_spends!(htlc_success_claim, commitment_tx[0]);
-       assert_eq!(htlc_success_claim.input[0].witness.clone().last().unwrap().len(), OFFERED_HTLC_SCRIPT_WEIGHT);
-       assert!(htlc_success_claim.output[0].script_pubkey.is_v0_p2wpkh()); // direct payment
-       assert_eq!(htlc_success_claim.lock_time, 0); // Success tx
-       check_spends!(htlc_timeout_bumped, c_txn[1]); // timeout tx on C remote commitment tx, issued by ChannelMonitor
-       assert_ne!(htlc_timeout_bumped.lock_time, 0); // Success tx
+       // ChannelMonitor: HTLC-Success tx, ChannelManager: local commitment tx + HTLC-Success tx
+       assert_eq!(b_txn.len(), 3);
+       check_spends!(b_txn[1], chan_1.3);
+       check_spends!(b_txn[2], b_txn[1]);
+       check_spends!(b_txn[0], commitment_tx[0]);
+       assert_eq!(b_txn[0].input[0].witness.clone().last().unwrap().len(), OFFERED_HTLC_SCRIPT_WEIGHT);
+       assert!(b_txn[0].output[0].script_pubkey.is_v0_p2wpkh()); // direct payment
+       assert_eq!(b_txn[0].lock_time, 0); // Success tx
  
        check_closed_broadcast!(nodes[1], true);
        check_added_monitors!(nodes[1], 1);
@@@ -5498,7 -5496,10 +5497,10 @@@ fn test_duplicate_payment_hash_one_fail
        expect_payment_failed!(nodes[0], duplicate_payment_hash, false);
  
        // Solve 2nd HTLC by broadcasting on B's chain HTLC-Success Tx from C
+       // Note that the fee paid is effectively double as the HTLC value (including the nodes[1] fee
+       // and nodes[2] fee) is rounded down and then claimed in full.
        mine_transaction(&nodes[1], &htlc_success_txn[0]);
+       expect_payment_forwarded!(nodes[1], Some(196*2), true);
        let updates = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id());
        assert!(updates.update_add_htlcs.is_empty());
        assert!(updates.update_fail_htlcs.is_empty());
@@@ -7747,7 -7748,7 +7749,7 @@@ fn test_data_loss_protect() 
  
        // Restore node A from previous state
        logger = test_utils::TestLogger::with_id(format!("node {}", 0));
 -      let mut chain_monitor = <(BlockHash, ChannelMonitor<EnforcingSigner>)>::read(&mut ::std::io::Cursor::new(previous_chain_monitor_state.0), keys_manager).unwrap().1;
 +      let mut chain_monitor = <(BlockHash, ChannelMonitor<EnforcingSigner>)>::read(&mut io::Cursor::new(previous_chain_monitor_state.0), keys_manager).unwrap().1;
        chain_source = test_utils::TestChainSource::new(Network::Testnet);
        tx_broadcaster = test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))};
        fee_estimator = test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) };
        node_state_0 = {
                let mut channel_monitors = HashMap::new();
                channel_monitors.insert(OutPoint { txid: chan.3.txid(), index: 0 }, &mut chain_monitor);
 -              <(BlockHash, ChannelManager<EnforcingSigner, &test_utils::TestChainMonitor, &test_utils::TestBroadcaster, &test_utils::TestKeysInterface, &test_utils::TestFeeEstimator, &test_utils::TestLogger>)>::read(&mut ::std::io::Cursor::new(previous_node_state), ChannelManagerReadArgs {
 +              <(BlockHash, ChannelManager<EnforcingSigner, &test_utils::TestChainMonitor, &test_utils::TestBroadcaster, &test_utils::TestKeysInterface, &test_utils::TestFeeEstimator, &test_utils::TestLogger>)>::read(&mut io::Cursor::new(previous_node_state), ChannelManagerReadArgs {
                        keys_manager: keys_manager,
                        fee_estimator: &fee_estimator,
                        chain_monitor: &monitor,
@@@ -8851,7 -8852,7 +8853,7 @@@ fn test_update_err_monitor_lockdown() 
                let mut w = test_utils::TestVecWriter(Vec::new());
                monitor.write(&mut w).unwrap();
                let new_monitor = <(BlockHash, channelmonitor::ChannelMonitor<EnforcingSigner>)>::read(
 -                              &mut ::std::io::Cursor::new(&w.0), &test_utils::OnlyReadsKeysInterface {}).unwrap().1;
 +                              &mut io::Cursor::new(&w.0), &test_utils::OnlyReadsKeysInterface {}).unwrap().1;
                assert!(new_monitor == *monitor);
                let watchtower = test_utils::TestChainMonitor::new(Some(&chain_source), &chanmon_cfgs[0].tx_broadcaster, &logger, &chanmon_cfgs[0].fee_estimator, &persister, &node_cfgs[0].keys_manager);
                assert!(watchtower.watch_channel(outpoint, new_monitor).is_ok());
@@@ -8913,7 -8914,7 +8915,7 @@@ fn test_concurrent_monitor_claim() 
                let mut w = test_utils::TestVecWriter(Vec::new());
                monitor.write(&mut w).unwrap();
                let new_monitor = <(BlockHash, channelmonitor::ChannelMonitor<EnforcingSigner>)>::read(
 -                              &mut ::std::io::Cursor::new(&w.0), &test_utils::OnlyReadsKeysInterface {}).unwrap().1;
 +                              &mut io::Cursor::new(&w.0), &test_utils::OnlyReadsKeysInterface {}).unwrap().1;
                assert!(new_monitor == *monitor);
                let watchtower = test_utils::TestChainMonitor::new(Some(&chain_source), &chanmon_cfgs[0].tx_broadcaster, &logger, &chanmon_cfgs[0].fee_estimator, &persister, &node_cfgs[0].keys_manager);
                assert!(watchtower.watch_channel(outpoint, new_monitor).is_ok());
                let mut w = test_utils::TestVecWriter(Vec::new());
                monitor.write(&mut w).unwrap();
                let new_monitor = <(BlockHash, channelmonitor::ChannelMonitor<EnforcingSigner>)>::read(
 -                              &mut ::std::io::Cursor::new(&w.0), &test_utils::OnlyReadsKeysInterface {}).unwrap().1;
 +                              &mut io::Cursor::new(&w.0), &test_utils::OnlyReadsKeysInterface {}).unwrap().1;
                assert!(new_monitor == *monitor);
                let watchtower = test_utils::TestChainMonitor::new(Some(&chain_source), &chanmon_cfgs[0].tx_broadcaster, &logger, &chanmon_cfgs[0].fee_estimator, &persister, &node_cfgs[0].keys_manager);
                assert!(watchtower.watch_channel(outpoint, new_monitor).is_ok());
@@@ -9157,6 -9158,7 +9159,7 @@@ fn do_test_onchain_htlc_settlement_afte
        assert_eq!(carol_updates.update_fulfill_htlcs.len(), 1);
  
        nodes[1].node.handle_update_fulfill_htlc(&nodes[2].node.get_our_node_id(), &carol_updates.update_fulfill_htlcs[0]);
+       expect_payment_forwarded!(nodes[1], if go_onchain_before_fulfill || force_closing_node == 1 { None } else { Some(1000) }, false);
        // If Alice broadcasted but Bob doesn't know yet, here he prepares to tell her about the preimage.
        if !go_onchain_before_fulfill && broadcast_alice {
                let events = nodes[1].node.get_and_clear_pending_msg_events();
index 7845c089cf9fa71758a2af8de5272fc1b481e6ae,4e13478db39a6f144d548a02b010e73a1159b79c..bbdb5bfac6bae9270e30bb99434c818020a3d475
@@@ -10,7 -10,6 +10,7 @@@
  //! Further functional tests which test blockchain reorganizations.
  
  use chain::channelmonitor::{ANTI_REORG_DELAY, ChannelMonitor};
 +use chain::transaction::OutPoint;
  use chain::{Confirm, Watch};
  use ln::channelmanager::{ChannelManager, ChannelManagerReadArgs};
  use ln::features::InitFeatures;
@@@ -21,10 -20,7 +21,10 @@@ use util::test_utils
  use util::ser::{ReadableArgs, Writeable};
  
  use bitcoin::blockdata::block::{Block, BlockHeader};
 +use bitcoin::blockdata::script::Builder;
 +use bitcoin::blockdata::opcodes;
  use bitcoin::hash_types::BlockHash;
 +use bitcoin::secp256k1::Secp256k1;
  
  use prelude::*;
  use core::mem;
@@@ -136,8 -132,9 +136,9 @@@ fn do_test_onchain_htlc_reorg(local_com
                connect_block(&nodes[1], &block);
  
                // ChannelManager only polls chain::Watch::release_pending_monitor_events when we
-               // probe it for events, so we probe non-message events here (which should still end up empty):
-               assert_eq!(nodes[1].node.get_and_clear_pending_events().len(), 0);
+               // probe it for events, so we probe non-message events here (which should just be the
+               // PaymentForwarded event).
+               expect_payment_forwarded!(nodes[1], Some(1000), true);
        } else {
                // Confirm the timeout tx and check that we fail the HTLC backwards
                let block = Block {
@@@ -431,112 -428,3 +432,112 @@@ fn test_set_outpoints_partial_claiming(
                node_txn.clear();
        }
  }
 +
 +fn do_test_to_remote_after_local_detection(style: ConnectStyle) {
 +      // In previous code, detection of to_remote outputs in a counterparty commitment transaction
 +      // was dependent on whether a local commitment transaction had been seen on-chain previously.
 +      // This resulted in some edge cases around not being able to generate a SpendableOutput event
 +      // after a reorg.
 +      //
 +      // Here, we test this by first confirming one set of commitment transactions, then
 +      // disconnecting them and reconnecting another. We then confirm them and check that the correct
 +      // SpendableOutput event is generated.
 +      let chanmon_cfgs = create_chanmon_cfgs(2);
 +      let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
 +      let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
 +      let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs);
 +
 +      *nodes[0].connect_style.borrow_mut() = style;
 +      *nodes[1].connect_style.borrow_mut() = style;
 +
 +      let (_, _, chan_id, funding_tx) =
 +              create_announced_chan_between_nodes_with_value(&nodes, 0, 1, 1_000_000, 100_000_000, InitFeatures::known(), InitFeatures::known());
 +      let funding_outpoint = OutPoint { txid: funding_tx.txid(), index: 0 };
 +      assert_eq!(funding_outpoint.to_channel_id(), chan_id);
 +
 +      let remote_txn_a = get_local_commitment_txn!(nodes[0], chan_id);
 +      let remote_txn_b = get_local_commitment_txn!(nodes[1], chan_id);
 +
 +      mine_transaction(&nodes[0], &remote_txn_a[0]);
 +      mine_transaction(&nodes[1], &remote_txn_a[0]);
 +
 +      assert!(nodes[0].node.list_channels().is_empty());
 +      check_closed_broadcast!(nodes[0], true);
 +      check_added_monitors!(nodes[0], 1);
 +      assert!(nodes[1].node.list_channels().is_empty());
 +      check_closed_broadcast!(nodes[1], true);
 +      check_added_monitors!(nodes[1], 1);
 +
 +      // Drop transactions broadcasted in response to the first commitment transaction (we have good
 +      // test coverage of these things already elsewhere).
 +      assert_eq!(nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().split_off(0).len(), 1);
 +      assert_eq!(nodes[1].tx_broadcaster.txn_broadcasted.lock().unwrap().split_off(0).len(), 1);
 +
 +      assert!(nodes[0].chain_monitor.chain_monitor.get_and_clear_pending_events().is_empty());
 +      assert!(nodes[1].chain_monitor.chain_monitor.get_and_clear_pending_events().is_empty());
 +
 +      disconnect_blocks(&nodes[0], 1);
 +      disconnect_blocks(&nodes[1], 1);
 +
 +      assert!(nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().is_empty());
 +      assert!(nodes[1].tx_broadcaster.txn_broadcasted.lock().unwrap().is_empty());
 +      assert!(nodes[0].chain_monitor.chain_monitor.get_and_clear_pending_events().is_empty());
 +      assert!(nodes[1].chain_monitor.chain_monitor.get_and_clear_pending_events().is_empty());
 +
 +      connect_blocks(&nodes[0], ANTI_REORG_DELAY - 1);
 +      connect_blocks(&nodes[1], ANTI_REORG_DELAY - 1);
 +
 +      assert!(nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().is_empty());
 +      assert!(nodes[1].tx_broadcaster.txn_broadcasted.lock().unwrap().is_empty());
 +      assert!(nodes[0].chain_monitor.chain_monitor.get_and_clear_pending_events().is_empty());
 +      assert!(nodes[1].chain_monitor.chain_monitor.get_and_clear_pending_events().is_empty());
 +
 +      mine_transaction(&nodes[0], &remote_txn_b[0]);
 +      mine_transaction(&nodes[1], &remote_txn_b[0]);
 +
 +      assert!(nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().is_empty());
 +      assert!(nodes[1].tx_broadcaster.txn_broadcasted.lock().unwrap().is_empty());
 +      assert!(nodes[0].chain_monitor.chain_monitor.get_and_clear_pending_events().is_empty());
 +      assert!(nodes[1].chain_monitor.chain_monitor.get_and_clear_pending_events().is_empty());
 +
 +      connect_blocks(&nodes[0], ANTI_REORG_DELAY - 1);
 +      connect_blocks(&nodes[1], ANTI_REORG_DELAY - 1);
 +
 +      let mut node_a_spendable = nodes[0].chain_monitor.chain_monitor.get_and_clear_pending_events();
 +      assert_eq!(node_a_spendable.len(), 1);
 +      if let Event::SpendableOutputs { outputs } = node_a_spendable.pop().unwrap() {
 +              assert_eq!(outputs.len(), 1);
 +              let spend_tx = nodes[0].keys_manager.backing.spend_spendable_outputs(&[&outputs[0]], Vec::new(),
 +                      Builder::new().push_opcode(opcodes::all::OP_RETURN).into_script(), 253, &Secp256k1::new()).unwrap();
 +              check_spends!(spend_tx, remote_txn_b[0]);
 +      }
 +
 +      // nodes[1] is waiting for the to_self_delay to expire, which is many more than
 +      // ANTI_REORG_DELAY. Instead, walk it back and confirm the original remote_txn_a commitment
 +      // again and check that nodes[1] generates a similar spendable output.
 +      // Technically a reorg of ANTI_REORG_DELAY violates our assumptions, so this is undefined by
 +      // our API spec, but we currently handle this correctly and there's little reason we shouldn't
 +      // in the future.
 +      assert!(nodes[1].chain_monitor.chain_monitor.get_and_clear_pending_events().is_empty());
 +      disconnect_blocks(&nodes[1], ANTI_REORG_DELAY);
 +      mine_transaction(&nodes[1], &remote_txn_a[0]);
 +      connect_blocks(&nodes[1], ANTI_REORG_DELAY - 1);
 +
 +      let mut node_b_spendable = nodes[1].chain_monitor.chain_monitor.get_and_clear_pending_events();
 +      assert_eq!(node_b_spendable.len(), 1);
 +      if let Event::SpendableOutputs { outputs } = node_b_spendable.pop().unwrap() {
 +              assert_eq!(outputs.len(), 1);
 +              let spend_tx = nodes[1].keys_manager.backing.spend_spendable_outputs(&[&outputs[0]], Vec::new(),
 +                      Builder::new().push_opcode(opcodes::all::OP_RETURN).into_script(), 253, &Secp256k1::new()).unwrap();
 +              check_spends!(spend_tx, remote_txn_a[0]);
 +      }
 +}
 +
 +#[test]
 +fn test_to_remote_after_local_detection() {
 +      do_test_to_remote_after_local_detection(ConnectStyle::BestBlockFirst);
 +      do_test_to_remote_after_local_detection(ConnectStyle::BestBlockFirstSkippingBlocks);
 +      do_test_to_remote_after_local_detection(ConnectStyle::TransactionsFirst);
 +      do_test_to_remote_after_local_detection(ConnectStyle::TransactionsFirstSkippingBlocks);
 +      do_test_to_remote_after_local_detection(ConnectStyle::FullBlockViaListen);
 +}
index 74490eb348b6f78eff96088b4db81b24ce1432ad,34b0b331e9e45d3512c6b65b41fd146904e735b0..1780483cb8b9e01bcab8d5f0c18e812522d27a7f
@@@ -23,7 -23,6 +23,7 @@@ use bitcoin::blockdata::script::Script
  
  use bitcoin::secp256k1::key::PublicKey;
  
 +use io;
  use prelude::*;
  use core::time::Duration;
  use core::ops::Deref;
@@@ -151,10 -150,31 +151,31 @@@ pub enum Event 
                /// The outputs which you should store as spendable by you.
                outputs: Vec<SpendableOutputDescriptor>,
        },
+       /// This event is generated when a payment has been successfully forwarded through us and a
+       /// forwarding fee earned.
+       PaymentForwarded {
+               /// The fee, in milli-satoshis, which was earned as a result of the payment.
+               ///
+               /// Note that if we force-closed the channel over which we forwarded an HTLC while the HTLC
+               /// was pending, the amount the next hop claimed will have been rounded down to the nearest
+               /// whole satoshi. Thus, the fee calculated here may be higher than expected as we still
+               /// claimed the full value in millisatoshis from the source. In this case,
+               /// `claim_from_onchain_tx` will be set.
+               ///
+               /// If the channel which sent us the payment has been force-closed, we will claim the funds
+               /// via an on-chain transaction. In that case we do not yet know the on-chain transaction
+               /// fees which we will spend and will instead set this to `None`. It is possible duplicate
+               /// `PaymentForwarded` events are generated for the same payment iff `fee_earned_msat` is
+               /// `None`.
+               fee_earned_msat: Option<u64>,
+               /// If this is `true`, the forwarded HTLC was claimed by our counterparty via an on-chain
+               /// transaction.
+               claim_from_onchain_tx: bool,
+       },
  }
  
  impl Writeable for Event {
 -      fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ::std::io::Error> {
 +      fn write<W: Writer>(&self, writer: &mut W) -> Result<(), io::Error> {
                match self {
                        &Event::FundingGenerationReady { .. } => {
                                0u8.write(writer)?;
                                        (0, VecWriteWrapper(outputs), required),
                                });
                        },
+                       &Event::PaymentForwarded { fee_earned_msat, claim_from_onchain_tx } => {
+                               7u8.write(writer)?;
+                               write_tlv_fields!(writer, {
+                                       (0, fee_earned_msat, option),
+                                       (2, claim_from_onchain_tx, required),
+                               });
+                       },
                }
                Ok(())
        }
  }
  impl MaybeReadable for Event {
 -      fn read<R: ::std::io::Read>(reader: &mut R) -> Result<Option<Self>, msgs::DecodeError> {
 +      fn read<R: io::Read>(reader: &mut R) -> Result<Option<Self>, msgs::DecodeError> {
                match Readable::read(reader)? {
                        0u8 => Ok(None),
                        1u8 => {
                                };
                                f()
                        },
+                       7u8 => {
+                               let f = || {
+                                       let mut fee_earned_msat = None;
+                                       let mut claim_from_onchain_tx = false;
+                                       read_tlv_fields!(reader, {
+                                               (0, fee_earned_msat, option),
+                                               (2, claim_from_onchain_tx, required),
+                                       });
+                                       Ok(Some(Event::PaymentForwarded { fee_earned_msat, claim_from_onchain_tx }))
+                               };
+                               f()
+                       },
+                       // Versions prior to 0.0.100 did not ignore odd types, instead returning InvalidValue.
+                       x if x % 2 == 1 => Ok(None),
                        _ => Err(msgs::DecodeError::InvalidValue)
                }
        }