From: Matt Corallo <649246+TheBlueMatt@users.noreply.github.com> Date: Wed, 4 Aug 2021 22:58:14 +0000 (+0000) Subject: Merge pull request #1004 from TheBlueMatt/2021-07-forward-event X-Git-Tag: v0.0.100~10 X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=commitdiff_plain;h=69ee4860848d5992b238eca3343141004d9d1572;hp=-c;p=rust-lightning Merge pull request #1004 from TheBlueMatt/2021-07-forward-event Add a `PaymentForwarded` Event --- 69ee4860848d5992b238eca3343141004d9d1572 diff --combined lightning/src/chain/channelmonitor.rs index 6e2ac3fcf,cec3f41c0..7904d9bde --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@@ -53,7 -53,7 +53,7 @@@ use util::events::Event use prelude::*; use core::{cmp, mem}; -use std::io::Error; +use io::{self, Error}; use core::ops::Deref; use sync::Mutex; @@@ -88,7 -88,7 +88,7 @@@ pub struct ChannelMonitorUpdate pub const CLOSED_CHANNEL_UPDATE_ID: u64 = core::u64::MAX; impl Writeable for ChannelMonitorUpdate { - fn write(&self, w: &mut W) -> Result<(), ::std::io::Error> { + fn write(&self, w: &mut W) -> Result<(), io::Error> { write_ver_prefix!(w, SERIALIZATION_VERSION, MIN_SERIALIZATION_VERSION); self.update_id.write(w)?; (self.updates.len() as u64).write(w)?; @@@ -100,7 -100,7 +100,7 @@@ } } impl Readable for ChannelMonitorUpdate { - fn read(r: &mut R) -> Result { + fn read(r: &mut R) -> Result { let _ver = read_ver_prefix!(r, SERIALIZATION_VERSION); let update_id: u64 = Readable::read(r)?; let len: u64 = Readable::read(r)?; @@@ -199,10 -199,12 +199,12 @@@ pub enum MonitorEvent pub struct HTLCUpdate { pub(crate) payment_hash: PaymentHash, pub(crate) payment_preimage: Option, - pub(crate) source: HTLCSource + pub(crate) source: HTLCSource, + pub(crate) onchain_value_satoshis: Option, } impl_writeable_tlv_based!(HTLCUpdate, { (0, payment_hash, required), + (1, onchain_value_satoshis, option), (2, source, required), (4, payment_preimage, option), }); @@@ -293,7 -295,7 +295,7 @@@ struct CounterpartyCommitmentTransactio } impl Writeable for CounterpartyCommitmentTransaction { - fn write(&self, w: &mut W) -> Result<(), ::std::io::Error> { + fn write(&self, w: &mut W) -> Result<(), io::Error> { w.write_all(&byte_utils::be64_to_array(self.per_htlc.len() as u64))?; for (ref txid, ref htlcs) in self.per_htlc.iter() { w.write_all(&txid[..])?; @@@ -311,7 -313,7 +313,7 @@@ } } impl Readable for CounterpartyCommitmentTransaction { - fn read(r: &mut R) -> Result { + fn read(r: &mut R) -> Result { let counterparty_commitment_transaction = { let per_htlc_len: u64 = Readable::read(r)?; let mut per_htlc = HashMap::with_capacity(cmp::min(per_htlc_len as usize, MAX_ALLOC_SIZE / 64)); @@@ -385,6 -387,7 +387,7 @@@ enum OnchainEvent HTLCUpdate { source: HTLCSource, payment_hash: PaymentHash, + onchain_value_satoshis: Option, }, MaturingOutput { descriptor: SpendableOutputDescriptor, @@@ -400,6 -403,7 +403,7 @@@ impl_writeable_tlv_based!(OnchainEventE impl_writeable_tlv_based_enum!(OnchainEvent, (0, HTLCUpdate) => { (0, source, required), + (1, onchain_value_satoshis, option), (2, payment_hash, required), }, (1, MaturingOutput) => { @@@ -1574,6 -1578,7 +1578,7 @@@ impl ChannelMonitorImpl { - self.onchain_events_awaiting_threshold_conf.retain(|ref entry| { - if entry.height != height { return true; } - match entry.event { - OnchainEvent::HTLCUpdate { source: ref update_source, .. } => { - *update_source != $source - }, - _ => true, - } - }); - let entry = OnchainEventEntry { - txid: commitment_txid, - height, - event: OnchainEvent::HTLCUpdate { source: $source, payment_hash: $payment_hash }, - }; - log_trace!(logger, "Failing HTLC with payment_hash {} from {} holder commitment tx due to broadcast of transaction, waiting confirmation (at height{})", log_bytes!($payment_hash.0), $commitment_tx, entry.confirmation_threshold()); - self.onchain_events_awaiting_threshold_conf.push(entry); - } - } - macro_rules! append_onchain_update { ($updates: expr, $to_watch: expr) => { claim_requests = $updates.0; @@@ -1828,11 -1813,30 +1813,30 @@@ } macro_rules! fail_dust_htlcs_after_threshold_conf { - ($holder_tx: expr) => { + ($holder_tx: expr, $commitment_tx: expr) => { for &(ref htlc, _, ref source) in &$holder_tx.htlc_outputs { if htlc.transaction_output_index.is_none() { if let &Some(ref source) = source { - wait_threshold_conf!(source.clone(), "lastest", htlc.payment_hash.clone()); + self.onchain_events_awaiting_threshold_conf.retain(|ref entry| { + if entry.height != height { return true; } + match entry.event { + OnchainEvent::HTLCUpdate { source: ref update_source, .. } => { + update_source != source + }, + _ => true, + } + }); + let entry = OnchainEventEntry { + txid: commitment_txid, + height, + event: OnchainEvent::HTLCUpdate { + source: source.clone(), payment_hash: htlc.payment_hash, + onchain_value_satoshis: Some(htlc.amount_msat / 1000) + }, + }; + log_trace!(logger, "Failing HTLC with payment_hash {} from {} holder commitment tx due to broadcast of transaction, waiting confirmation (at height{})", + log_bytes!(htlc.payment_hash.0), $commitment_tx, entry.confirmation_threshold()); + self.onchain_events_awaiting_threshold_conf.push(entry); } } } @@@ -1840,9 -1844,9 +1844,9 @@@ } if is_holder_tx { - fail_dust_htlcs_after_threshold_conf!(self.current_holder_commitment_tx); + fail_dust_htlcs_after_threshold_conf!(self.current_holder_commitment_tx, "latest"); if let &Some(ref holder_tx) = &self.prev_holder_signed_commitment_tx { - fail_dust_htlcs_after_threshold_conf!(holder_tx); + fail_dust_htlcs_after_threshold_conf!(holder_tx, "previous"); } } @@@ -2090,7 -2094,7 +2094,7 @@@ // Produce actionable events from on-chain events having reached their threshold. for entry in onchain_events_reaching_threshold_conf.drain(..) { match entry.event { - OnchainEvent::HTLCUpdate { ref source, payment_hash } => { + OnchainEvent::HTLCUpdate { ref source, payment_hash, onchain_value_satoshis } => { // Check for duplicate HTLC resolutions. #[cfg(debug_assertions)] { @@@ -2109,9 -2113,10 +2113,10 @@@ log_debug!(logger, "HTLC {} failure update has got enough confirmations to be passed upstream", log_bytes!(payment_hash.0)); self.pending_monitor_events.push(MonitorEvent::HTLCEvent(HTLCUpdate { - payment_hash: payment_hash, + payment_hash, payment_preimage: None, source: source.clone(), + onchain_value_satoshis, })); }, OnchainEvent::MaturingOutput { descriptor } => { @@@ -2328,7 -2333,7 +2333,7 @@@ if pending_htlc.payment_hash == $htlc_output.payment_hash && pending_htlc.amount_msat == $htlc_output.amount_msat { if let &Some(ref source) = pending_source { log_claim!("revoked counterparty commitment tx", false, pending_htlc, true); - payment_data = Some(((**source).clone(), $htlc_output.payment_hash)); + payment_data = Some(((**source).clone(), $htlc_output.payment_hash, $htlc_output.amount_msat)); break; } } @@@ -2348,7 -2353,7 +2353,7 @@@ // transaction. This implies we either learned a preimage, the HTLC // has timed out, or we screwed up. In any case, we should now // resolve the source HTLC with the original sender. - payment_data = Some(((*source).clone(), htlc_output.payment_hash)); + payment_data = Some(((*source).clone(), htlc_output.payment_hash, htlc_output.amount_msat)); } else if !$holder_tx { check_htlc_valid_counterparty!(self.current_counterparty_commitment_txid, htlc_output); if payment_data.is_none() { @@@ -2381,7 -2386,7 +2386,7 @@@ // Check that scan_commitment, above, decided there is some source worth relaying an // HTLC resolution backwards to and figure out whether we learned a preimage from it. - if let Some((source, payment_hash)) = payment_data { + if let Some((source, payment_hash, amount_msat)) = payment_data { let mut payment_preimage = PaymentPreimage([0; 32]); if accepted_preimage_claim { if !self.pending_monitor_events.iter().any( @@@ -2390,7 -2395,8 +2395,8 @@@ self.pending_monitor_events.push(MonitorEvent::HTLCEvent(HTLCUpdate { source, payment_preimage: Some(payment_preimage), - payment_hash + payment_hash, + onchain_value_satoshis: Some(amount_msat / 1000), })); } } else if offered_preimage_claim { @@@ -2402,7 -2408,8 +2408,8 @@@ self.pending_monitor_events.push(MonitorEvent::HTLCEvent(HTLCUpdate { source, payment_preimage: Some(payment_preimage), - payment_hash + payment_hash, + onchain_value_satoshis: Some(amount_msat / 1000), })); } } else { @@@ -2418,7 -2425,10 +2425,10 @@@ let entry = OnchainEventEntry { txid: tx.txid(), height, - event: OnchainEvent::HTLCUpdate { source: source, payment_hash: payment_hash }, + event: OnchainEvent::HTLCUpdate { + source, payment_hash, + onchain_value_satoshis: Some(amount_msat / 1000), + }, }; log_info!(logger, "Failing HTLC with payment_hash {} timeout by a spend tx, waiting for confirmation (at height {})", log_bytes!(payment_hash.0), entry.confirmation_threshold()); self.onchain_events_awaiting_threshold_conf.push(entry); @@@ -2451,8 -2461,7 +2461,8 @@@ output: outp.clone(), }); break; - } else if let Some(ref broadcasted_holder_revokable_script) = self.broadcasted_holder_revokable_script { + } + if let Some(ref broadcasted_holder_revokable_script) = self.broadcasted_holder_revokable_script { if broadcasted_holder_revokable_script.0 == outp.script_pubkey { spendable_output = Some(SpendableOutputDescriptor::DelayedPaymentOutput(DelayedPaymentOutputDescriptor { outpoint: OutPoint { txid: tx.txid(), index: i as u16 }, @@@ -2465,8 -2474,7 +2475,8 @@@ })); break; } - } else if self.counterparty_payment_script == outp.script_pubkey { + } + if self.counterparty_payment_script == outp.script_pubkey { spendable_output = Some(SpendableOutputDescriptor::StaticPaymentOutput(StaticPaymentOutputDescriptor { outpoint: OutPoint { txid: tx.txid(), index: i as u16 }, output: outp.clone(), @@@ -2474,13 -2482,11 +2484,13 @@@ channel_value_satoshis: self.channel_value_satoshis, })); break; - } else if outp.script_pubkey == self.shutdown_script { + } + if outp.script_pubkey == self.shutdown_script { spendable_output = Some(SpendableOutputDescriptor::StaticOutput { outpoint: OutPoint { txid: tx.txid(), index: i as u16 }, output: outp.clone(), }); + break; } } if let Some(spendable_output) = spendable_output { @@@ -2585,7 -2591,7 +2595,7 @@@ const MAX_ALLOC_SIZE: usize = 64*1024 impl<'a, Signer: Sign, K: KeysInterface> ReadableArgs<&'a K> for (BlockHash, ChannelMonitor) { - fn read(reader: &mut R, keys_manager: &'a K) -> Result { + fn read(reader: &mut R, keys_manager: &'a K) -> Result { macro_rules! unwrap_obj { ($key: expr) => { match $key { diff --combined lightning/src/ln/chanmon_update_fail_tests.rs index 00b681341,c3a2f5af3..720190ec8 --- a/lightning/src/ln/chanmon_update_fail_tests.rs +++ b/lightning/src/ln/chanmon_update_fail_tests.rs @@@ -40,7 -40,6 +40,7 @@@ use ln::functional_test_utils::* use util::test_utils; +use io; use prelude::*; use sync::{Arc, Mutex}; @@@ -123,7 -122,7 +123,7 @@@ fn test_monitor_and_persister_update_fa let mut w = test_utils::TestVecWriter(Vec::new()); monitor.write(&mut w).unwrap(); let new_monitor = <(BlockHash, ChannelMonitor)>::read( - &mut ::std::io::Cursor::new(&w.0), &test_utils::OnlyReadsKeysInterface {}).unwrap().1; + &mut io::Cursor::new(&w.0), &test_utils::OnlyReadsKeysInterface {}).unwrap().1; assert!(new_monitor == *monitor); let chain_mon = test_utils::TestChainMonitor::new(Some(&chain_source), &tx_broadcaster, &logger, &chanmon_cfgs[0].fee_estimator, &persister, &node_cfgs[0].keys_manager); assert!(chain_mon.watch_channel(outpoint, new_monitor).is_ok()); @@@ -1160,6 -1159,7 +1160,7 @@@ fn test_monitor_update_fail_reestablish assert!(updates.update_fee.is_none()); assert_eq!(updates.update_fulfill_htlcs.len(), 1); nodes[1].node.handle_update_fulfill_htlc(&nodes[2].node.get_our_node_id(), &updates.update_fulfill_htlcs[0]); + expect_payment_forwarded!(nodes[1], Some(1000), false); check_added_monitors!(nodes[1], 1); assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); commitment_signed_dance!(nodes[1], nodes[2], updates.commitment_signed, false); @@@ -2318,6 -2318,7 +2319,7 @@@ fn do_test_reconnect_dup_htlc_claims(ht assert_eq!(fulfill_msg, cs_updates.update_fulfill_htlcs[0]); } nodes[1].node.handle_update_fulfill_htlc(&nodes[2].node.get_our_node_id(), &fulfill_msg); + expect_payment_forwarded!(nodes[1], Some(1000), false); check_added_monitors!(nodes[1], 1); let mut bs_updates = None; diff --combined lightning/src/ln/channel.rs index 10641d0ab,b9a63f1c3..f80377b7d --- a/lightning/src/ln/channel.rs +++ b/lightning/src/ln/channel.rs @@@ -41,7 -41,6 +41,7 @@@ use util::errors::APIError use util::config::{UserConfig,ChannelConfig}; use util::scid_utils::scid_from_parts; +use io; use prelude::*; use core::{cmp,mem,fmt}; use core::ops::Deref; @@@ -307,6 -306,7 +307,7 @@@ pub struct CounterpartyForwardingInfo enum UpdateFulfillFetch { NewClaim { monitor_update: ChannelMonitorUpdate, + htlc_value_msat: u64, msg: Option, }, DuplicateClaim {}, @@@ -320,6 -320,8 +321,8 @@@ pub enum UpdateFulfillCommitFetch NewClaim { /// The ChannelMonitorUpdate which places the new payment preimage in the channel monitor monitor_update: ChannelMonitorUpdate, + /// The value of the HTLC which was claimed, in msat. + htlc_value_msat: u64, /// The update_fulfill message and commitment_signed message (if the claim was not placed /// in the holding cell). msgs: Option<(msgs::UpdateFulfillHTLC, msgs::CommitmentSigned)>, @@@ -337,6 -339,9 +340,9 @@@ // Holder designates channel data owned for the benefice of the user client. // Counterparty designates channel data owned by the another channel participant entity. pub(super) struct Channel { + #[cfg(any(test, feature = "_test_utils"))] + pub(crate) config: ChannelConfig, + #[cfg(not(any(test, feature = "_test_utils")))] config: ChannelConfig, user_id: u64, @@@ -1276,6 -1281,7 +1282,7 @@@ impl Channel // these, but for now we just have to treat them as normal. let mut pending_idx = core::usize::MAX; + let mut htlc_value_msat = 0; for (idx, htlc) in self.pending_inbound_htlcs.iter().enumerate() { if htlc.htlc_id == htlc_id_arg { assert_eq!(htlc.payment_hash, payment_hash_calc); @@@ -1295,6 -1301,7 +1302,7 @@@ } } pending_idx = idx; + htlc_value_msat = htlc.amount_msat; break; } } @@@ -1336,7 -1343,7 +1344,7 @@@ // TODO: We may actually be able to switch to a fulfill here, though its // rare enough it may not be worth the complexity burden. debug_assert!(false, "Tried to fulfill an HTLC that was already failed"); - return UpdateFulfillFetch::NewClaim { monitor_update, msg: None }; + return UpdateFulfillFetch::NewClaim { monitor_update, htlc_value_msat, msg: None }; } }, _ => {} @@@ -1348,7 -1355,7 +1356,7 @@@ }); #[cfg(any(test, feature = "fuzztarget"))] self.historical_inbound_htlc_fulfills.insert(htlc_id_arg); - return UpdateFulfillFetch::NewClaim { monitor_update, msg: None }; + return UpdateFulfillFetch::NewClaim { monitor_update, htlc_value_msat, msg: None }; } #[cfg(any(test, feature = "fuzztarget"))] self.historical_inbound_htlc_fulfills.insert(htlc_id_arg); @@@ -1358,7 -1365,7 +1366,7 @@@ if let InboundHTLCState::Committed = htlc.state { } else { debug_assert!(false, "Have an inbound HTLC we tried to claim before it was fully committed to"); - return UpdateFulfillFetch::NewClaim { monitor_update, msg: None }; + return UpdateFulfillFetch::NewClaim { monitor_update, htlc_value_msat, msg: None }; } log_trace!(logger, "Upgrading HTLC {} to LocalRemoved with a Fulfill in channel {}!", log_bytes!(htlc.payment_hash.0), log_bytes!(self.channel_id)); htlc.state = InboundHTLCState::LocalRemoved(InboundHTLCRemovalReason::Fulfill(payment_preimage_arg.clone())); @@@ -1366,6 -1373,7 +1374,7 @@@ UpdateFulfillFetch::NewClaim { monitor_update, + htlc_value_msat, msg: Some(msgs::UpdateFulfillHTLC { channel_id: self.channel_id(), htlc_id: htlc_id_arg, @@@ -1376,7 -1384,7 +1385,7 @@@ pub fn get_update_fulfill_htlc_and_commit(&mut self, htlc_id: u64, payment_preimage: PaymentPreimage, logger: &L) -> Result where L::Target: Logger { match self.get_update_fulfill_htlc(htlc_id, payment_preimage, logger) { - UpdateFulfillFetch::NewClaim { mut monitor_update, msg: Some(update_fulfill_htlc) } => { + UpdateFulfillFetch::NewClaim { mut monitor_update, htlc_value_msat, msg: Some(update_fulfill_htlc) } => { let (commitment, mut additional_update) = match self.send_commitment_no_status_check(logger) { Err(e) => return Err((e, monitor_update)), Ok(res) => res @@@ -1385,9 -1393,10 +1394,10 @@@ // strictly increasing by one, so decrement it here. self.latest_monitor_update_id = monitor_update.update_id; monitor_update.updates.append(&mut additional_update.updates); - Ok(UpdateFulfillCommitFetch::NewClaim { monitor_update, msgs: Some((update_fulfill_htlc, commitment)) }) + Ok(UpdateFulfillCommitFetch::NewClaim { monitor_update, htlc_value_msat, msgs: Some((update_fulfill_htlc, commitment)) }) }, - UpdateFulfillFetch::NewClaim { monitor_update, msg: None } => Ok(UpdateFulfillCommitFetch::NewClaim { monitor_update, msgs: None }), + UpdateFulfillFetch::NewClaim { monitor_update, htlc_value_msat, msg: None } => + Ok(UpdateFulfillCommitFetch::NewClaim { monitor_update, htlc_value_msat, msgs: None }), UpdateFulfillFetch::DuplicateClaim {} => Ok(UpdateFulfillCommitFetch::DuplicateClaim {}), } } @@@ -2164,7 -2173,7 +2174,7 @@@ /// Marks an outbound HTLC which we have received update_fail/fulfill/malformed #[inline] - fn mark_outbound_htlc_removed(&mut self, htlc_id: u64, check_preimage: Option, fail_reason: Option) -> Result<&HTLCSource, ChannelError> { + fn mark_outbound_htlc_removed(&mut self, htlc_id: u64, check_preimage: Option, fail_reason: Option) -> Result<&OutboundHTLCOutput, ChannelError> { for htlc in self.pending_outbound_htlcs.iter_mut() { if htlc.htlc_id == htlc_id { match check_preimage { @@@ -2183,13 -2192,13 +2193,13 @@@ OutboundHTLCState::AwaitingRemoteRevokeToRemove(_) | OutboundHTLCState::AwaitingRemovedRemoteRevoke(_) | OutboundHTLCState::RemoteRemoved(_) => return Err(ChannelError::Close(format!("Remote tried to fulfill/fail HTLC ({}) that they'd already fulfilled/failed", htlc_id))), } - return Ok(&htlc.source); + return Ok(htlc); } } Err(ChannelError::Close("Remote tried to fulfill/fail an HTLC we couldn't find".to_owned())) } - pub fn update_fulfill_htlc(&mut self, msg: &msgs::UpdateFulfillHTLC) -> Result { + pub fn update_fulfill_htlc(&mut self, msg: &msgs::UpdateFulfillHTLC) -> Result<(HTLCSource, u64), ChannelError> { if (self.channel_state & (ChannelState::ChannelFunded as u32)) != (ChannelState::ChannelFunded as u32) { return Err(ChannelError::Close("Got fulfill HTLC message when channel was not in an operational state".to_owned())); } @@@ -2198,7 -2207,7 +2208,7 @@@ } let payment_hash = PaymentHash(Sha256::hash(&msg.payment_preimage.0[..]).into_inner()); - self.mark_outbound_htlc_removed(msg.htlc_id, Some(payment_hash), None).map(|source| source.clone()) + self.mark_outbound_htlc_removed(msg.htlc_id, Some(payment_hash), None).map(|htlc| (htlc.source.clone(), htlc.amount_msat)) } pub fn update_fail_htlc(&mut self, msg: &msgs::UpdateFailHTLC, fail_reason: HTLCFailReason) -> Result<(), ChannelError> { @@@ -2497,7 -2506,7 +2507,7 @@@ // in it hitting the holding cell again and we cannot change the state of a // holding cell HTLC from fulfill to anything else. let (update_fulfill_msg_option, mut additional_monitor_update) = - if let UpdateFulfillFetch::NewClaim { msg, monitor_update } = self.get_update_fulfill_htlc(htlc_id, *payment_preimage, logger) { + if let UpdateFulfillFetch::NewClaim { msg, monitor_update, .. } = self.get_update_fulfill_htlc(htlc_id, *payment_preimage, logger) { (msg, monitor_update) } else { unreachable!() }; update_fulfill_htlcs.push(update_fulfill_msg_option.unwrap()); @@@ -4513,7 -4522,7 +4523,7 @@@ impl_writeable_tlv_based_enum!(InboundH ); impl Writeable for ChannelUpdateStatus { - fn write(&self, writer: &mut W) -> Result<(), ::std::io::Error> { + fn write(&self, writer: &mut W) -> Result<(), io::Error> { // We only care about writing out the current state as it was announced, ie only either // Enabled or Disabled. In the case of DisabledStaged, we most recently announced the // channel as enabled, so we write 0. For EnabledStaged, we similarly write a 1. @@@ -4528,7 -4537,7 +4538,7 @@@ } impl Readable for ChannelUpdateStatus { - fn read(reader: &mut R) -> Result { + fn read(reader: &mut R) -> Result { Ok(match ::read(reader)? { 0 => ChannelUpdateStatus::Enabled, 1 => ChannelUpdateStatus::Disabled, @@@ -4538,7 -4547,7 +4548,7 @@@ } impl Writeable for Channel { - fn write(&self, writer: &mut W) -> Result<(), ::std::io::Error> { + fn write(&self, writer: &mut W) -> Result<(), io::Error> { // Note that we write out as if remove_uncommitted_htlcs_and_mark_paused had just been // called. @@@ -4771,7 -4780,7 +4781,7 @@@ const MAX_ALLOC_SIZE: usize = 64*1024; impl<'a, Signer: Sign, K: Deref> ReadableArgs<&'a K> for Channel where K::Target: KeysInterface { - fn read(reader: &mut R, keys_source: &'a K) -> Result { + fn read(reader: &mut R, keys_source: &'a K) -> Result { let ver = read_ver_prefix!(reader, SERIALIZATION_VERSION); let user_id = Readable::read(reader)?; diff --combined lightning/src/ln/channelmanager.rs index 7e42705c2,ad9a7e295..afbfd0ee9 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@@ -60,11 -60,10 +60,11 @@@ use util::chacha20::{ChaCha20, ChaChaRe use util::logger::{Logger, Level}; use util::errors::APIError; +use io; use prelude::*; use core::{cmp, mem}; use core::cell::RefCell; -use std::io::{Cursor, Read}; +use io::{Cursor, Read}; use sync::{Arc, Condvar, Mutex, MutexGuard, RwLock, RwLockReadGuard}; use core::sync::atomic::{AtomicUsize, Ordering}; use core::time::Duration; @@@ -208,6 -207,14 +208,14 @@@ pub(super) enum HTLCFailReason } } + /// Return value for claim_funds_from_hop + enum ClaimFundsFromHop { + PrevHopForceClosed, + MonitorUpdateFail(PublicKey, MsgHandleErrInternal, Option), + Success(u64), + DuplicateClaim, + } + type ShutdownResult = (Option<(OutPoint, ChannelMonitorUpdate)>, Vec<(HTLCSource, PaymentHash)>); /// Error type returned across the channel_state mutex boundary. When an Err is generated for a @@@ -2788,16 -2795,22 +2796,22 @@@ impl { - if let msgs::ErrorAction::IgnoreError = e.1.err.action { + ClaimFundsFromHop::MonitorUpdateFail(pk, err, _) => { + if let msgs::ErrorAction::IgnoreError = err.err.action { // We got a temporary failure updating monitor, but will claim the // HTLC when the monitor updating is restored (or on chain). - log_error!(self.logger, "Temporary failure claiming HTLC, treating as success: {}", e.1.err.err); + log_error!(self.logger, "Temporary failure claiming HTLC, treating as success: {}", err.err.err); claimed_any_htlcs = true; - } else { errs.push(e); } + } else { errs.push((pk, err)); } + }, + ClaimFundsFromHop::PrevHopForceClosed => unreachable!("We already checked for channel existence, we can't fail here!"), + ClaimFundsFromHop::DuplicateClaim => { + // While we should never get here in most cases, if we do, it likely + // indicates that the HTLC was timed out some time ago and is no longer + // available to be claimed. Thus, it does not make sense to set + // `claimed_any_htlcs`. }, - Err(None) => unreachable!("We already checked for channel existence, we can't fail here!"), - Ok(()) => claimed_any_htlcs = true, + ClaimFundsFromHop::Success(_) => claimed_any_htlcs = true, } } } @@@ -2815,28 -2828,29 +2829,29 @@@ } else { false } } - fn claim_funds_from_hop(&self, channel_state_lock: &mut MutexGuard>, prev_hop: HTLCPreviousHopData, payment_preimage: PaymentPreimage) -> Result<(), Option<(PublicKey, MsgHandleErrInternal)>> { + fn claim_funds_from_hop(&self, channel_state_lock: &mut MutexGuard>, prev_hop: HTLCPreviousHopData, payment_preimage: PaymentPreimage) -> ClaimFundsFromHop { //TODO: Delay the claimed_funds relaying just like we do outbound relay! let channel_state = &mut **channel_state_lock; let chan_id = match channel_state.short_to_id.get(&prev_hop.short_channel_id) { Some(chan_id) => chan_id.clone(), None => { - return Err(None) + return ClaimFundsFromHop::PrevHopForceClosed } }; if let hash_map::Entry::Occupied(mut chan) = channel_state.by_id.entry(chan_id) { match chan.get_mut().get_update_fulfill_htlc_and_commit(prev_hop.htlc_id, payment_preimage, &self.logger) { Ok(msgs_monitor_option) => { - if let UpdateFulfillCommitFetch::NewClaim { msgs, monitor_update } = msgs_monitor_option { + if let UpdateFulfillCommitFetch::NewClaim { msgs, htlc_value_msat, monitor_update } = msgs_monitor_option { if let Err(e) = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), monitor_update) { log_given_level!(self.logger, if e == ChannelMonitorUpdateErr::PermanentFailure { Level::Error } else { Level::Debug }, "Failed to update channel monitor with preimage {:?}: {:?}", payment_preimage, e); - return Err(Some(( + return ClaimFundsFromHop::MonitorUpdateFail( chan.get().get_counterparty_node_id(), handle_monitor_err!(self, e, channel_state, chan, RAACommitmentOrder::CommitmentFirst, false, msgs.is_some()).unwrap_err(), - ))); + Some(htlc_value_msat) + ); } if let Some((msg, commitment_signed)) = msgs { log_debug!(self.logger, "Claiming funds for HTLC with preimage {} resulted in a commitment_signed for channel {}", @@@ -2853,8 -2867,10 +2868,10 @@@ } }); } + return ClaimFundsFromHop::Success(htlc_value_msat); + } else { + return ClaimFundsFromHop::DuplicateClaim; } - return Ok(()) }, Err((e, monitor_update)) => { if let Err(e) = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), monitor_update) { @@@ -2867,13 -2883,13 +2884,13 @@@ if drop { chan.remove_entry(); } - return Err(Some((counterparty_node_id, res))); + return ClaimFundsFromHop::MonitorUpdateFail(counterparty_node_id, res, None); }, } } else { unreachable!(); } } - fn claim_funds_internal(&self, mut channel_state_lock: MutexGuard>, source: HTLCSource, payment_preimage: PaymentPreimage) { + fn claim_funds_internal(&self, mut channel_state_lock: MutexGuard>, source: HTLCSource, payment_preimage: PaymentPreimage, forwarded_htlc_value_msat: Option, from_onchain: bool) { match source { HTLCSource::OutboundRoute { session_priv, .. } => { mem::drop(channel_state_lock); @@@ -2892,29 -2908,51 +2909,51 @@@ }, HTLCSource::PreviousHopData(hop_data) => { let prev_outpoint = hop_data.outpoint; - if let Err((counterparty_node_id, err)) = match self.claim_funds_from_hop(&mut channel_state_lock, hop_data, payment_preimage) { - Ok(()) => Ok(()), - Err(None) => { - let preimage_update = ChannelMonitorUpdate { - update_id: CLOSED_CHANNEL_UPDATE_ID, - updates: vec![ChannelMonitorUpdateStep::PaymentPreimage { - payment_preimage: payment_preimage.clone(), - }], - }; - // We update the ChannelMonitor on the backward link, after - // receiving an offchain preimage event from the forward link (the - // event being update_fulfill_htlc). - if let Err(e) = self.chain_monitor.update_channel(prev_outpoint, preimage_update) { - log_error!(self.logger, "Critical error: failed to update channel monitor with preimage {:?}: {:?}", - payment_preimage, e); - } - Ok(()) - }, - Err(Some(res)) => Err(res), - } { - mem::drop(channel_state_lock); - let res: Result<(), _> = Err(err); - let _ = handle_error!(self, res, counterparty_node_id); + let res = self.claim_funds_from_hop(&mut channel_state_lock, hop_data, payment_preimage); + let claimed_htlc = if let ClaimFundsFromHop::DuplicateClaim = res { false } else { true }; + let htlc_claim_value_msat = match res { + ClaimFundsFromHop::MonitorUpdateFail(_, _, amt_opt) => amt_opt, + ClaimFundsFromHop::Success(amt) => Some(amt), + _ => None, + }; + if let ClaimFundsFromHop::PrevHopForceClosed = res { + let preimage_update = ChannelMonitorUpdate { + update_id: CLOSED_CHANNEL_UPDATE_ID, + updates: vec![ChannelMonitorUpdateStep::PaymentPreimage { + payment_preimage: payment_preimage.clone(), + }], + }; + // We update the ChannelMonitor on the backward link, after + // receiving an offchain preimage event from the forward link (the + // event being update_fulfill_htlc). + if let Err(e) = self.chain_monitor.update_channel(prev_outpoint, preimage_update) { + log_error!(self.logger, "Critical error: failed to update channel monitor with preimage {:?}: {:?}", + payment_preimage, e); + } + // Note that we do *not* set `claimed_htlc` to false here. In fact, this + // totally could be a duplicate claim, but we have no way of knowing + // without interrogating the `ChannelMonitor` we've provided the above + // update to. Instead, we simply document in `PaymentForwarded` that this + // can happen. + } + mem::drop(channel_state_lock); + if let ClaimFundsFromHop::MonitorUpdateFail(pk, err, _) = res { + let result: Result<(), _> = Err(err); + let _ = handle_error!(self, result, pk); + } + + if claimed_htlc { + if let Some(forwarded_htlc_value) = forwarded_htlc_value_msat { + let fee_earned_msat = if let Some(claimed_htlc_value) = htlc_claim_value_msat { + Some(claimed_htlc_value - forwarded_htlc_value) + } else { None }; + + let mut pending_events = self.pending_events.lock().unwrap(); + pending_events.push(events::Event::PaymentForwarded { + fee_earned_msat, + claim_from_onchain_tx: from_onchain, + }); + } } }, } @@@ -3310,7 -3348,7 +3349,7 @@@ fn internal_update_fulfill_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFulfillHTLC) -> Result<(), MsgHandleErrInternal> { let mut channel_lock = self.channel_state.lock().unwrap(); - let htlc_source = { + let (htlc_source, forwarded_htlc_value) = { let channel_state = &mut *channel_lock; match channel_state.by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { @@@ -3322,7 -3360,7 +3361,7 @@@ hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id)) } }; - self.claim_funds_internal(channel_lock, htlc_source, msg.payment_preimage.clone()); + self.claim_funds_internal(channel_lock, htlc_source, msg.payment_preimage.clone(), Some(forwarded_htlc_value), false); Ok(()) } @@@ -3689,14 -3727,14 +3728,14 @@@ /// Process pending events from the `chain::Watch`, returning whether any events were processed. fn process_pending_monitor_events(&self) -> bool { let mut failed_channels = Vec::new(); - let pending_monitor_events = self.chain_monitor.release_pending_monitor_events(); + let mut pending_monitor_events = self.chain_monitor.release_pending_monitor_events(); let has_pending_monitor_events = !pending_monitor_events.is_empty(); - for monitor_event in pending_monitor_events { + for monitor_event in pending_monitor_events.drain(..) { match monitor_event { MonitorEvent::HTLCEvent(htlc_update) => { if let Some(preimage) = htlc_update.payment_preimage { log_trace!(self.logger, "Claiming HTLC with preimage {} from our monitor", log_bytes!(preimage.0)); - self.claim_funds_internal(self.channel_state.lock().unwrap(), htlc_update.source, preimage); + self.claim_funds_internal(self.channel_state.lock().unwrap(), htlc_update.source, preimage, htlc_update.onchain_value_satoshis.map(|v| v * 1000), true); } else { log_trace!(self.logger, "Failing HTLC with hash {} from our monitor", log_bytes!(htlc_update.payment_hash.0)); self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_update.source, &htlc_update.payment_hash, HTLCFailReason::Reason { failure_code: 0x4000 | 8, data: Vec::new() }); @@@ -3991,7 -4029,7 +4030,7 @@@ wher result = NotifyOption::DoPersist; } - let mut pending_events = std::mem::replace(&mut *self.pending_events.lock().unwrap(), vec![]); + let mut pending_events = mem::replace(&mut *self.pending_events.lock().unwrap(), vec![]); if !pending_events.is_empty() { result = NotifyOption::DoPersist; } @@@ -4611,7 -4649,7 +4650,7 @@@ impl_writeable_tlv_based!(HTLCPreviousH }); impl Writeable for ClaimableHTLC { - fn write(&self, writer: &mut W) -> Result<(), ::std::io::Error> { + fn write(&self, writer: &mut W) -> Result<(), io::Error> { let payment_data = match &self.onion_payload { OnionPayload::Invoice(data) => Some(data.clone()), _ => None, @@@ -4715,7 -4753,7 +4754,7 @@@ impl(&self, writer: &mut W) -> Result<(), ::std::io::Error> { + fn write(&self, writer: &mut W) -> Result<(), io::Error> { let _consistency_lock = self.total_consistency_lock.write().unwrap(); write_ver_prefix!(writer, SERIALIZATION_VERSION, MIN_SERIALIZATION_VERSION); @@@ -4911,7 -4949,7 +4950,7 @@@ impl<'a, Signer: Sign, M: Deref, T: Der F::Target: FeeEstimator, L::Target: Logger, { - fn read(reader: &mut R, args: ChannelManagerReadArgs<'a, Signer, M, T, K, F, L>) -> Result { + fn read(reader: &mut R, args: ChannelManagerReadArgs<'a, Signer, M, T, K, F, L>) -> Result { let (blockhash, chan_manager) = <(BlockHash, ChannelManager)>::read(reader, args)?; Ok((blockhash, Arc::new(chan_manager))) } @@@ -4925,7 -4963,7 +4964,7 @@@ impl<'a, Signer: Sign, M: Deref, T: Der F::Target: FeeEstimator, L::Target: Logger, { - fn read(reader: &mut R, mut args: ChannelManagerReadArgs<'a, Signer, M, T, K, F, L>) -> Result { + fn read(reader: &mut R, mut args: ChannelManagerReadArgs<'a, Signer, M, T, K, F, L>) -> Result { let _ver = read_ver_prefix!(reader, SERIALIZATION_VERSION); let genesis_hash: BlockHash = Readable::read(reader)?; @@@ -5115,8 -5153,10 +5154,8 @@@ mod tests { use bitcoin::hashes::Hash; use bitcoin::hashes::sha256::Hash as Sha256; - use core::sync::atomic::{AtomicBool, Ordering}; use core::time::Duration; use ln::{PaymentPreimage, PaymentHash, PaymentSecret}; - use ln::channelmanager::PersistenceNotifier; use ln::features::{InitFeatures, InvoiceFeatures}; use ln::functional_test_utils::*; use ln::msgs; @@@ -5124,15 -5164,12 +5163,15 @@@ use routing::router::{get_keysend_route, get_route}; use util::events::{Event, MessageSendEvent, MessageSendEventsProvider}; use util::test_utils; - use std::sync::Arc; - use std::thread; #[cfg(feature = "std")] #[test] fn test_wait_timeout() { + use ln::channelmanager::PersistenceNotifier; + use sync::Arc; + use core::sync::atomic::{AtomicBool, Ordering}; + use std::thread; + let persistence_notifier = Arc::new(PersistenceNotifier::new()); let thread_notifier = Arc::clone(&persistence_notifier); @@@ -5182,12 -5219,6 +5221,12 @@@ let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]); let nodes = create_network(3, &node_cfgs, &node_chanmgrs); + // All nodes start with a persistable update pending as `create_network` connects each node + // with all other nodes to make most tests simpler. + assert!(nodes[0].node.await_persistable_update_timeout(Duration::from_millis(1))); + assert!(nodes[1].node.await_persistable_update_timeout(Duration::from_millis(1))); + assert!(nodes[2].node.await_persistable_update_timeout(Duration::from_millis(1))); + let mut chan = create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::known(), InitFeatures::known()); // We check that the channel info nodes have doesn't change too early, even though we try diff --combined lightning/src/ln/functional_test_utils.rs index feeb691b1,bea2990cc..807bb20f7 --- a/lightning/src/ln/functional_test_utils.rs +++ b/lightning/src/ln/functional_test_utils.rs @@@ -39,7 -39,6 +39,7 @@@ use bitcoin::hash_types::BlockHash use bitcoin::secp256k1::key::PublicKey; +use io; use prelude::*; use core::cell::RefCell; use std::rc::Rc; @@@ -240,7 -239,7 +240,7 @@@ impl<'a, 'b, 'c> Drop for Node<'a, 'b, let mut w = test_utils::TestVecWriter(Vec::new()); let network_graph_ser = self.net_graph_msg_handler.network_graph.read().unwrap(); network_graph_ser.write(&mut w).unwrap(); - let network_graph_deser = ::read(&mut ::std::io::Cursor::new(&w.0)).unwrap(); + let network_graph_deser = ::read(&mut io::Cursor::new(&w.0)).unwrap(); assert!(network_graph_deser == *self.net_graph_msg_handler.network_graph.read().unwrap()); let net_graph_msg_handler = NetGraphMsgHandler::from_net_graph( Some(self.chain_source), self.logger, network_graph_deser @@@ -278,7 -277,7 +278,7 @@@ let mut w = test_utils::TestVecWriter(Vec::new()); old_monitor.write(&mut w).unwrap(); let (_, deserialized_monitor) = <(BlockHash, ChannelMonitor)>::read( - &mut ::std::io::Cursor::new(&w.0), self.keys_manager).unwrap(); + &mut io::Cursor::new(&w.0), self.keys_manager).unwrap(); deserialized_monitors.push(deserialized_monitor); } } @@@ -293,7 -292,7 +293,7 @@@ let mut w = test_utils::TestVecWriter(Vec::new()); self.node.write(&mut w).unwrap(); - <(BlockHash, ChannelManager)>::read(&mut ::std::io::Cursor::new(w.0), ChannelManagerReadArgs { + <(BlockHash, ChannelManager)>::read(&mut io::Cursor::new(w.0), ChannelManagerReadArgs { default_config: *self.node.get_current_default_configuration(), keys_manager: self.keys_manager, fee_estimator: &test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) }, @@@ -1006,6 -1005,20 +1006,20 @@@ macro_rules! expect_payment_sent } } + macro_rules! expect_payment_forwarded { + ($node: expr, $expected_fee: expr, $upstream_force_closed: expr) => { + let events = $node.node.get_and_clear_pending_events(); + assert_eq!(events.len(), 1); + match events[0] { + Event::PaymentForwarded { fee_earned_msat, claim_from_onchain_tx } => { + assert_eq!(fee_earned_msat, $expected_fee); + assert_eq!(claim_from_onchain_tx, $upstream_force_closed); + }, + _ => panic!("Unexpected event"), + } + } + } + #[cfg(test)] macro_rules! expect_payment_failure_chan_update { ($node: expr, $scid: expr, $chan_closed: expr) => { @@@ -1170,6 -1183,8 +1184,8 @@@ pub fn claim_payment_along_route<'a, 'b ($node: expr, $prev_node: expr, $new_msgs: expr) => { { $node.node.handle_update_fulfill_htlc(&$prev_node.node.get_our_node_id(), &next_msgs.as_ref().unwrap().0); + let fee = $node.node.channel_state.lock().unwrap().by_id.get(&next_msgs.as_ref().unwrap().0.channel_id).unwrap().config.forwarding_fee_base_msat; + expect_payment_forwarded!($node, Some(fee as u64), false); check_added_monitors!($node, 1); let new_next_msgs = if $new_msgs { let events = $node.node.get_and_clear_pending_msg_events(); @@@ -1405,13 -1420,6 +1421,13 @@@ pub fn create_network<'a, 'b: 'a, 'c: ' }) } + for i in 0..node_count { + for j in (i+1)..node_count { + nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &msgs::Init { features: InitFeatures::known() }); + nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &msgs::Init { features: InitFeatures::known() }); + } + } + nodes } diff --combined lightning/src/ln/functional_tests.rs index 7e9eb4c96,6dec20b2a..6e5395e65 --- a/lightning/src/ln/functional_tests.rs +++ b/lightning/src/ln/functional_tests.rs @@@ -51,7 -51,6 +51,7 @@@ use bitcoin::secp256k1::key::{PublicKey use regex; +use io; use prelude::*; use alloc::collections::BTreeSet; use core::default::Default; @@@ -887,6 -886,7 +887,7 @@@ fn updates_shutdown_wait() assert!(updates.update_fee.is_none()); assert_eq!(updates.update_fulfill_htlcs.len(), 1); nodes[1].node.handle_update_fulfill_htlc(&nodes[2].node.get_our_node_id(), &updates.update_fulfill_htlcs[0]); + expect_payment_forwarded!(nodes[1], Some(1000), false); check_added_monitors!(nodes[1], 1); let updates_2 = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id()); commitment_signed_dance!(nodes[1], nodes[2], updates.commitment_signed, false); @@@ -1061,6 -1061,7 +1062,7 @@@ fn do_test_shutdown_rebroadcast(recv_co assert!(updates.update_fee.is_none()); assert_eq!(updates.update_fulfill_htlcs.len(), 1); nodes[1].node.handle_update_fulfill_htlc(&nodes[2].node.get_our_node_id(), &updates.update_fulfill_htlcs[0]); + expect_payment_forwarded!(nodes[1], Some(1000), false); check_added_monitors!(nodes[1], 1); let updates_2 = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id()); commitment_signed_dance!(nodes[1], nodes[2], updates.commitment_signed, false); @@@ -2833,6 -2834,12 +2835,12 @@@ fn test_htlc_on_chain_success() assert_eq!(added_monitors[0].0.txid, chan_2.3.txid()); added_monitors.clear(); } + let forwarded_events = nodes[1].node.get_and_clear_pending_events(); + assert_eq!(forwarded_events.len(), 2); + if let Event::PaymentForwarded { fee_earned_msat: Some(1000), claim_from_onchain_tx: true } = forwarded_events[0] { + } else { panic!(); } + if let Event::PaymentForwarded { fee_earned_msat: Some(1000), claim_from_onchain_tx: true } = forwarded_events[1] { + } else { panic!(); } let events = nodes[1].node.get_and_clear_pending_msg_events(); { let mut added_monitors = nodes[1].chain_monitor.added_monitors.lock().unwrap(); @@@ -4550,7 -4557,7 +4558,7 @@@ fn test_dup_htlc_onchain_fails_on_reloa let mut channel_monitors = HashMap::new(); channel_monitors.insert(chan_0_monitor.get_funding_txo().0, &mut chan_0_monitor); <(BlockHash, ChannelManager)> - ::read(&mut std::io::Cursor::new(&chan_manager_serialized.0[..]), ChannelManagerReadArgs { + ::read(&mut io::Cursor::new(&chan_manager_serialized.0[..]), ChannelManagerReadArgs { default_config: Default::default(), keys_manager, fee_estimator: node_cfgs[0].fee_estimator, @@@ -5330,19 -5337,15 +5338,15 @@@ fn test_onchain_to_onchain_claim() // So we broadcast C's commitment tx and HTLC-Success on B's chain, we should successfully be able to extract preimage and update downstream monitor let header = BlockHeader { version: 0x20000000, prev_blockhash: nodes[1].best_block_hash(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42}; connect_block(&nodes[1], &Block { header, txdata: vec![c_txn[1].clone(), c_txn[2].clone()]}); - connect_blocks(&nodes[1], TEST_FINAL_CLTV - 1); // Confirm blocks until the HTLC expires + check_added_monitors!(nodes[1], 1); + expect_payment_forwarded!(nodes[1], Some(1000), true); { let mut b_txn = nodes[1].tx_broadcaster.txn_broadcasted.lock().unwrap(); - // ChannelMonitor: claim tx, ChannelManager: local commitment tx - assert_eq!(b_txn.len(), 2); + // ChannelMonitor: claim tx + assert_eq!(b_txn.len(), 1); check_spends!(b_txn[0], chan_2.3); // B local commitment tx, issued by ChannelManager - check_spends!(b_txn[1], c_txn[1]); // timeout tx on C remote commitment tx, issued by ChannelMonitor - assert_eq!(b_txn[1].input[0].witness.clone().last().unwrap().len(), ACCEPTED_HTLC_SCRIPT_WEIGHT); - assert!(b_txn[1].output[0].script_pubkey.is_v0_p2wpkh()); // direct payment - assert_ne!(b_txn[1].lock_time, 0); // Timeout tx b_txn.clear(); } - check_added_monitors!(nodes[1], 1); let msg_events = nodes[1].node.get_and_clear_pending_msg_events(); assert_eq!(msg_events.len(), 3); check_added_monitors!(nodes[1], 1); @@@ -5368,19 -5371,14 +5372,14 @@@ let commitment_tx = get_local_commitment_txn!(nodes[0], chan_1.2); mine_transaction(&nodes[1], &commitment_tx[0]); let b_txn = nodes[1].tx_broadcaster.txn_broadcasted.lock().unwrap(); - // ChannelMonitor: HTLC-Success tx + HTLC-Timeout RBF Bump, ChannelManager: local commitment tx + HTLC-Success tx - assert_eq!(b_txn.len(), 4); - check_spends!(b_txn[2], chan_1.3); - check_spends!(b_txn[3], b_txn[2]); - let (htlc_success_claim, htlc_timeout_bumped) = - if b_txn[0].input[0].previous_output.txid == commitment_tx[0].txid() - { (&b_txn[0], &b_txn[1]) } else { (&b_txn[1], &b_txn[0]) }; - check_spends!(htlc_success_claim, commitment_tx[0]); - assert_eq!(htlc_success_claim.input[0].witness.clone().last().unwrap().len(), OFFERED_HTLC_SCRIPT_WEIGHT); - assert!(htlc_success_claim.output[0].script_pubkey.is_v0_p2wpkh()); // direct payment - assert_eq!(htlc_success_claim.lock_time, 0); // Success tx - check_spends!(htlc_timeout_bumped, c_txn[1]); // timeout tx on C remote commitment tx, issued by ChannelMonitor - assert_ne!(htlc_timeout_bumped.lock_time, 0); // Success tx + // ChannelMonitor: HTLC-Success tx, ChannelManager: local commitment tx + HTLC-Success tx + assert_eq!(b_txn.len(), 3); + check_spends!(b_txn[1], chan_1.3); + check_spends!(b_txn[2], b_txn[1]); + check_spends!(b_txn[0], commitment_tx[0]); + assert_eq!(b_txn[0].input[0].witness.clone().last().unwrap().len(), OFFERED_HTLC_SCRIPT_WEIGHT); + assert!(b_txn[0].output[0].script_pubkey.is_v0_p2wpkh()); // direct payment + assert_eq!(b_txn[0].lock_time, 0); // Success tx check_closed_broadcast!(nodes[1], true); check_added_monitors!(nodes[1], 1); @@@ -5498,7 -5496,10 +5497,10 @@@ fn test_duplicate_payment_hash_one_fail expect_payment_failed!(nodes[0], duplicate_payment_hash, false); // Solve 2nd HTLC by broadcasting on B's chain HTLC-Success Tx from C + // Note that the fee paid is effectively double as the HTLC value (including the nodes[1] fee + // and nodes[2] fee) is rounded down and then claimed in full. mine_transaction(&nodes[1], &htlc_success_txn[0]); + expect_payment_forwarded!(nodes[1], Some(196*2), true); let updates = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id()); assert!(updates.update_add_htlcs.is_empty()); assert!(updates.update_fail_htlcs.is_empty()); @@@ -7747,7 -7748,7 +7749,7 @@@ fn test_data_loss_protect() // Restore node A from previous state logger = test_utils::TestLogger::with_id(format!("node {}", 0)); - let mut chain_monitor = <(BlockHash, ChannelMonitor)>::read(&mut ::std::io::Cursor::new(previous_chain_monitor_state.0), keys_manager).unwrap().1; + let mut chain_monitor = <(BlockHash, ChannelMonitor)>::read(&mut io::Cursor::new(previous_chain_monitor_state.0), keys_manager).unwrap().1; chain_source = test_utils::TestChainSource::new(Network::Testnet); tx_broadcaster = test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))}; fee_estimator = test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) }; @@@ -7756,7 -7757,7 +7758,7 @@@ node_state_0 = { let mut channel_monitors = HashMap::new(); channel_monitors.insert(OutPoint { txid: chan.3.txid(), index: 0 }, &mut chain_monitor); - <(BlockHash, ChannelManager)>::read(&mut ::std::io::Cursor::new(previous_node_state), ChannelManagerReadArgs { + <(BlockHash, ChannelManager)>::read(&mut io::Cursor::new(previous_node_state), ChannelManagerReadArgs { keys_manager: keys_manager, fee_estimator: &fee_estimator, chain_monitor: &monitor, @@@ -8851,7 -8852,7 +8853,7 @@@ fn test_update_err_monitor_lockdown() let mut w = test_utils::TestVecWriter(Vec::new()); monitor.write(&mut w).unwrap(); let new_monitor = <(BlockHash, channelmonitor::ChannelMonitor)>::read( - &mut ::std::io::Cursor::new(&w.0), &test_utils::OnlyReadsKeysInterface {}).unwrap().1; + &mut io::Cursor::new(&w.0), &test_utils::OnlyReadsKeysInterface {}).unwrap().1; assert!(new_monitor == *monitor); let watchtower = test_utils::TestChainMonitor::new(Some(&chain_source), &chanmon_cfgs[0].tx_broadcaster, &logger, &chanmon_cfgs[0].fee_estimator, &persister, &node_cfgs[0].keys_manager); assert!(watchtower.watch_channel(outpoint, new_monitor).is_ok()); @@@ -8913,7 -8914,7 +8915,7 @@@ fn test_concurrent_monitor_claim() let mut w = test_utils::TestVecWriter(Vec::new()); monitor.write(&mut w).unwrap(); let new_monitor = <(BlockHash, channelmonitor::ChannelMonitor)>::read( - &mut ::std::io::Cursor::new(&w.0), &test_utils::OnlyReadsKeysInterface {}).unwrap().1; + &mut io::Cursor::new(&w.0), &test_utils::OnlyReadsKeysInterface {}).unwrap().1; assert!(new_monitor == *monitor); let watchtower = test_utils::TestChainMonitor::new(Some(&chain_source), &chanmon_cfgs[0].tx_broadcaster, &logger, &chanmon_cfgs[0].fee_estimator, &persister, &node_cfgs[0].keys_manager); assert!(watchtower.watch_channel(outpoint, new_monitor).is_ok()); @@@ -8942,7 -8943,7 +8944,7 @@@ let mut w = test_utils::TestVecWriter(Vec::new()); monitor.write(&mut w).unwrap(); let new_monitor = <(BlockHash, channelmonitor::ChannelMonitor)>::read( - &mut ::std::io::Cursor::new(&w.0), &test_utils::OnlyReadsKeysInterface {}).unwrap().1; + &mut io::Cursor::new(&w.0), &test_utils::OnlyReadsKeysInterface {}).unwrap().1; assert!(new_monitor == *monitor); let watchtower = test_utils::TestChainMonitor::new(Some(&chain_source), &chanmon_cfgs[0].tx_broadcaster, &logger, &chanmon_cfgs[0].fee_estimator, &persister, &node_cfgs[0].keys_manager); assert!(watchtower.watch_channel(outpoint, new_monitor).is_ok()); @@@ -9157,6 -9158,7 +9159,7 @@@ fn do_test_onchain_htlc_settlement_afte assert_eq!(carol_updates.update_fulfill_htlcs.len(), 1); nodes[1].node.handle_update_fulfill_htlc(&nodes[2].node.get_our_node_id(), &carol_updates.update_fulfill_htlcs[0]); + expect_payment_forwarded!(nodes[1], if go_onchain_before_fulfill || force_closing_node == 1 { None } else { Some(1000) }, false); // If Alice broadcasted but Bob doesn't know yet, here he prepares to tell her about the preimage. if !go_onchain_before_fulfill && broadcast_alice { let events = nodes[1].node.get_and_clear_pending_msg_events(); diff --combined lightning/src/ln/reorg_tests.rs index 7845c089c,4e13478db..bbdb5bfac --- a/lightning/src/ln/reorg_tests.rs +++ b/lightning/src/ln/reorg_tests.rs @@@ -10,7 -10,6 +10,7 @@@ //! Further functional tests which test blockchain reorganizations. use chain::channelmonitor::{ANTI_REORG_DELAY, ChannelMonitor}; +use chain::transaction::OutPoint; use chain::{Confirm, Watch}; use ln::channelmanager::{ChannelManager, ChannelManagerReadArgs}; use ln::features::InitFeatures; @@@ -21,10 -20,7 +21,10 @@@ use util::test_utils use util::ser::{ReadableArgs, Writeable}; use bitcoin::blockdata::block::{Block, BlockHeader}; +use bitcoin::blockdata::script::Builder; +use bitcoin::blockdata::opcodes; use bitcoin::hash_types::BlockHash; +use bitcoin::secp256k1::Secp256k1; use prelude::*; use core::mem; @@@ -136,8 -132,9 +136,9 @@@ fn do_test_onchain_htlc_reorg(local_com connect_block(&nodes[1], &block); // ChannelManager only polls chain::Watch::release_pending_monitor_events when we - // probe it for events, so we probe non-message events here (which should still end up empty): - assert_eq!(nodes[1].node.get_and_clear_pending_events().len(), 0); + // probe it for events, so we probe non-message events here (which should just be the + // PaymentForwarded event). + expect_payment_forwarded!(nodes[1], Some(1000), true); } else { // Confirm the timeout tx and check that we fail the HTLC backwards let block = Block { @@@ -431,112 -428,3 +432,112 @@@ fn test_set_outpoints_partial_claiming( node_txn.clear(); } } + +fn do_test_to_remote_after_local_detection(style: ConnectStyle) { + // In previous code, detection of to_remote outputs in a counterparty commitment transaction + // was dependent on whether a local commitment transaction had been seen on-chain previously. + // This resulted in some edge cases around not being able to generate a SpendableOutput event + // after a reorg. + // + // Here, we test this by first confirming one set of commitment transactions, then + // disconnecting them and reconnecting another. We then confirm them and check that the correct + // SpendableOutput event is generated. + let chanmon_cfgs = create_chanmon_cfgs(2); + let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); + let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); + let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs); + + *nodes[0].connect_style.borrow_mut() = style; + *nodes[1].connect_style.borrow_mut() = style; + + let (_, _, chan_id, funding_tx) = + create_announced_chan_between_nodes_with_value(&nodes, 0, 1, 1_000_000, 100_000_000, InitFeatures::known(), InitFeatures::known()); + let funding_outpoint = OutPoint { txid: funding_tx.txid(), index: 0 }; + assert_eq!(funding_outpoint.to_channel_id(), chan_id); + + let remote_txn_a = get_local_commitment_txn!(nodes[0], chan_id); + let remote_txn_b = get_local_commitment_txn!(nodes[1], chan_id); + + mine_transaction(&nodes[0], &remote_txn_a[0]); + mine_transaction(&nodes[1], &remote_txn_a[0]); + + assert!(nodes[0].node.list_channels().is_empty()); + check_closed_broadcast!(nodes[0], true); + check_added_monitors!(nodes[0], 1); + assert!(nodes[1].node.list_channels().is_empty()); + check_closed_broadcast!(nodes[1], true); + check_added_monitors!(nodes[1], 1); + + // Drop transactions broadcasted in response to the first commitment transaction (we have good + // test coverage of these things already elsewhere). + assert_eq!(nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().split_off(0).len(), 1); + assert_eq!(nodes[1].tx_broadcaster.txn_broadcasted.lock().unwrap().split_off(0).len(), 1); + + assert!(nodes[0].chain_monitor.chain_monitor.get_and_clear_pending_events().is_empty()); + assert!(nodes[1].chain_monitor.chain_monitor.get_and_clear_pending_events().is_empty()); + + disconnect_blocks(&nodes[0], 1); + disconnect_blocks(&nodes[1], 1); + + assert!(nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().is_empty()); + assert!(nodes[1].tx_broadcaster.txn_broadcasted.lock().unwrap().is_empty()); + assert!(nodes[0].chain_monitor.chain_monitor.get_and_clear_pending_events().is_empty()); + assert!(nodes[1].chain_monitor.chain_monitor.get_and_clear_pending_events().is_empty()); + + connect_blocks(&nodes[0], ANTI_REORG_DELAY - 1); + connect_blocks(&nodes[1], ANTI_REORG_DELAY - 1); + + assert!(nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().is_empty()); + assert!(nodes[1].tx_broadcaster.txn_broadcasted.lock().unwrap().is_empty()); + assert!(nodes[0].chain_monitor.chain_monitor.get_and_clear_pending_events().is_empty()); + assert!(nodes[1].chain_monitor.chain_monitor.get_and_clear_pending_events().is_empty()); + + mine_transaction(&nodes[0], &remote_txn_b[0]); + mine_transaction(&nodes[1], &remote_txn_b[0]); + + assert!(nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().is_empty()); + assert!(nodes[1].tx_broadcaster.txn_broadcasted.lock().unwrap().is_empty()); + assert!(nodes[0].chain_monitor.chain_monitor.get_and_clear_pending_events().is_empty()); + assert!(nodes[1].chain_monitor.chain_monitor.get_and_clear_pending_events().is_empty()); + + connect_blocks(&nodes[0], ANTI_REORG_DELAY - 1); + connect_blocks(&nodes[1], ANTI_REORG_DELAY - 1); + + let mut node_a_spendable = nodes[0].chain_monitor.chain_monitor.get_and_clear_pending_events(); + assert_eq!(node_a_spendable.len(), 1); + if let Event::SpendableOutputs { outputs } = node_a_spendable.pop().unwrap() { + assert_eq!(outputs.len(), 1); + let spend_tx = nodes[0].keys_manager.backing.spend_spendable_outputs(&[&outputs[0]], Vec::new(), + Builder::new().push_opcode(opcodes::all::OP_RETURN).into_script(), 253, &Secp256k1::new()).unwrap(); + check_spends!(spend_tx, remote_txn_b[0]); + } + + // nodes[1] is waiting for the to_self_delay to expire, which is many more than + // ANTI_REORG_DELAY. Instead, walk it back and confirm the original remote_txn_a commitment + // again and check that nodes[1] generates a similar spendable output. + // Technically a reorg of ANTI_REORG_DELAY violates our assumptions, so this is undefined by + // our API spec, but we currently handle this correctly and there's little reason we shouldn't + // in the future. + assert!(nodes[1].chain_monitor.chain_monitor.get_and_clear_pending_events().is_empty()); + disconnect_blocks(&nodes[1], ANTI_REORG_DELAY); + mine_transaction(&nodes[1], &remote_txn_a[0]); + connect_blocks(&nodes[1], ANTI_REORG_DELAY - 1); + + let mut node_b_spendable = nodes[1].chain_monitor.chain_monitor.get_and_clear_pending_events(); + assert_eq!(node_b_spendable.len(), 1); + if let Event::SpendableOutputs { outputs } = node_b_spendable.pop().unwrap() { + assert_eq!(outputs.len(), 1); + let spend_tx = nodes[1].keys_manager.backing.spend_spendable_outputs(&[&outputs[0]], Vec::new(), + Builder::new().push_opcode(opcodes::all::OP_RETURN).into_script(), 253, &Secp256k1::new()).unwrap(); + check_spends!(spend_tx, remote_txn_a[0]); + } +} + +#[test] +fn test_to_remote_after_local_detection() { + do_test_to_remote_after_local_detection(ConnectStyle::BestBlockFirst); + do_test_to_remote_after_local_detection(ConnectStyle::BestBlockFirstSkippingBlocks); + do_test_to_remote_after_local_detection(ConnectStyle::TransactionsFirst); + do_test_to_remote_after_local_detection(ConnectStyle::TransactionsFirstSkippingBlocks); + do_test_to_remote_after_local_detection(ConnectStyle::FullBlockViaListen); +} diff --combined lightning/src/util/events.rs index 74490eb34,34b0b331e..1780483cb --- a/lightning/src/util/events.rs +++ b/lightning/src/util/events.rs @@@ -23,7 -23,6 +23,7 @@@ use bitcoin::blockdata::script::Script use bitcoin::secp256k1::key::PublicKey; +use io; use prelude::*; use core::time::Duration; use core::ops::Deref; @@@ -151,10 -150,31 +151,31 @@@ pub enum Event /// The outputs which you should store as spendable by you. outputs: Vec, }, + /// This event is generated when a payment has been successfully forwarded through us and a + /// forwarding fee earned. + PaymentForwarded { + /// The fee, in milli-satoshis, which was earned as a result of the payment. + /// + /// Note that if we force-closed the channel over which we forwarded an HTLC while the HTLC + /// was pending, the amount the next hop claimed will have been rounded down to the nearest + /// whole satoshi. Thus, the fee calculated here may be higher than expected as we still + /// claimed the full value in millisatoshis from the source. In this case, + /// `claim_from_onchain_tx` will be set. + /// + /// If the channel which sent us the payment has been force-closed, we will claim the funds + /// via an on-chain transaction. In that case we do not yet know the on-chain transaction + /// fees which we will spend and will instead set this to `None`. It is possible duplicate + /// `PaymentForwarded` events are generated for the same payment iff `fee_earned_msat` is + /// `None`. + fee_earned_msat: Option, + /// If this is `true`, the forwarded HTLC was claimed by our counterparty via an on-chain + /// transaction. + claim_from_onchain_tx: bool, + }, } impl Writeable for Event { - fn write(&self, writer: &mut W) -> Result<(), ::std::io::Error> { + fn write(&self, writer: &mut W) -> Result<(), io::Error> { match self { &Event::FundingGenerationReady { .. } => { 0u8.write(writer)?; @@@ -218,12 -238,19 +239,19 @@@ (0, VecWriteWrapper(outputs), required), }); }, + &Event::PaymentForwarded { fee_earned_msat, claim_from_onchain_tx } => { + 7u8.write(writer)?; + write_tlv_fields!(writer, { + (0, fee_earned_msat, option), + (2, claim_from_onchain_tx, required), + }); + }, } Ok(()) } } impl MaybeReadable for Event { - fn read(reader: &mut R) -> Result, msgs::DecodeError> { + fn read(reader: &mut R) -> Result, msgs::DecodeError> { match Readable::read(reader)? { 0u8 => Ok(None), 1u8 => { @@@ -313,6 -340,20 +341,20 @@@ }; f() }, + 7u8 => { + let f = || { + let mut fee_earned_msat = None; + let mut claim_from_onchain_tx = false; + read_tlv_fields!(reader, { + (0, fee_earned_msat, option), + (2, claim_from_onchain_tx, required), + }); + Ok(Some(Event::PaymentForwarded { fee_earned_msat, claim_from_onchain_tx })) + }; + f() + }, + // Versions prior to 0.0.100 did not ignore odd types, instead returning InvalidValue. + x if x % 2 == 1 => Ok(None), _ => Err(msgs::DecodeError::InvalidValue) } }