From 43c4ffe3c7f7b23853091958d83eda802133b5af Mon Sep 17 00:00:00 2001 From: Antoine Riard Date: Fri, 30 Nov 2018 11:06:02 -0500 Subject: [PATCH] Add is_resolving_output in ChannelMonitor Called in ChannelMonitor block_connected, returning pending_htlc_updated upstream via ManyChannelMonitor to link htlcs between monitors. Used by ChannelManager to fulfill/fail htlcs backwards accordingly If spurrious htlc updates are generated due to block re-scan and htlc are already LocalRemoved, discard them in channel get_update_*_htlc --- src/ln/channel.rs | 30 +++++-- src/ln/channelmanager.rs | 118 +++++++++++++++++++++++++-- src/ln/channelmonitor.rs | 168 ++++++++++++++++++++++++++++++++++++++- src/util/test_utils.rs | 5 ++ 4 files changed, 302 insertions(+), 19 deletions(-) diff --git a/src/ln/channel.rs b/src/ln/channel.rs index f3b812d00..fabad3b4f 100644 --- a/src/ln/channel.rs +++ b/src/ln/channel.rs @@ -1140,10 +1140,17 @@ impl Channel { for (idx, htlc) in self.pending_inbound_htlcs.iter().enumerate() { if htlc.htlc_id == htlc_id_arg { assert_eq!(htlc.payment_hash, payment_hash_calc); - if let InboundHTLCState::Committed = htlc.state { - } else { - debug_assert!(false, "Have an inbound HTLC we tried to claim before it was fully committed to"); - // Don't return in release mode here so that we can update channel_monitor + match htlc.state { + InboundHTLCState::Committed => {}, + InboundHTLCState::LocalRemoved(_) => { + //TODO (ariard) We may have spurrious HTLC update events from ChannelMonitor due to block re-scan + // Should we discard them ? + return Ok((None, None)); + }, + _ => { + debug_assert!(false, "Have an inbound HTLC we tried to claim before it was fully committed to"); + // Don't return in release mode here so that we can update channel_monitor + } } pending_idx = idx; break; @@ -1226,10 +1233,17 @@ impl Channel { let mut pending_idx = std::usize::MAX; for (idx, htlc) in self.pending_inbound_htlcs.iter().enumerate() { if htlc.htlc_id == htlc_id_arg { - if let InboundHTLCState::Committed = htlc.state { - } else { - debug_assert!(false, "Have an inbound HTLC we tried to fail before it was fully committed to"); - return Err(ChannelError::Ignore("Unable to find a pending HTLC which matched the given HTLC ID")); + match htlc.state { + InboundHTLCState::Committed => {}, + InboundHTLCState::LocalRemoved(_) => { + //TODO (ariard) We may have spurrious HTLC update events from ChannelMonitor due to block re-scan + // Should we discard them ? + return Ok(None); + }, + _ => { + debug_assert!(false, "Have an inbound HTLC we tried to claim before it was fully committed to"); + // Don't return in release mode here so that we can update channel_monitor + } } pending_idx = idx; } diff --git a/src/ln/channelmanager.rs b/src/ln/channelmanager.rs index e8deecdcb..c73a1051a 100644 --- a/src/ln/channelmanager.rs +++ b/src/ln/channelmanager.rs @@ -624,13 +624,7 @@ impl ChannelManager { for tx in local_txn { self.tx_broadcaster.broadcast_transaction(&tx); } - //TODO: We need to have a way where outbound HTLC claims can result in us claiming the - //now-on-chain HTLC output for ourselves (and, thereafter, passing the HTLC backwards). - //TODO: We need to handle monitoring of pending offered HTLCs which just hit the chain and - //may be claimed, resulting in us claiming the inbound HTLCs (and back-failing after - //timeouts are hit and our claims confirm). - //TODO: In any case, we need to make sure we remove any pending htlc tracking (via - //fail_backwards or claim_funds) eventually for all HTLCs that were in the channel + } /// Force closes a channel, immediately broadcasting the latest local commitment transaction to @@ -2661,6 +2655,17 @@ impl ChainListener for ChannelManager { for failure in failed_channels.drain(..) { self.finish_force_close_channel(failure); } + { + let mut channel_state = Some(self.channel_state.lock().unwrap()); + for (_, payment_preimage, htlc_source) in self.monitor.fetch_pending_htlc_updated() { + if let Some(source) = htlc_source { + if let Some(preimage) = payment_preimage { + if channel_state.is_none() { channel_state = Some(self.channel_state.lock().unwrap());} + self.claim_funds_internal(channel_state.take().unwrap(), source, preimage); + } + } + } + } self.latest_block_height.store(height as usize, Ordering::Release); *self.last_block_hash.try_lock().expect("block_(dis)connected must not be called in parallel") = header.bitcoin_hash(); } @@ -6064,6 +6069,105 @@ mod tests { assert_eq!(nodes[1].node.list_channels().len(), 0); } + #[test] + fn test_htlc_on_chain_success() { + // Test that in case of an unilateral close onchain, we detect the state of output thanks to + // ChainWatchInterface and pass the preimage backward accordingly. So here we test that ChannelManager is + // broadcasting the right event to other nodes in payment path. + // A --------------------> B ----------------------> C (preimage) + // A's commitment tx C's commitment tx + // \ \ + // B's preimage tx C's HTLC Success tx + + let nodes = create_network(3); + + // Create some initial channels + let chan_1 = create_announced_chan_between_nodes(&nodes, 0, 1); + let chan_2 = create_announced_chan_between_nodes(&nodes, 1, 2); + + // Rebalance the network a bit by relaying one payment through all the channels... + send_payment(&nodes[0], &vec!(&nodes[1], &nodes[2])[..], 8000000); + send_payment(&nodes[0], &vec!(&nodes[1], &nodes[2])[..], 8000000); + + let (payment_preimage, _payment_hash) = route_payment(&nodes[0], &vec!(&nodes[1], &nodes[2]), 3000000); + let header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42}; + + // Broadcast legit commitment tx from C on B's chain + // Broadcast HTLC Success transation by C on received output from C's commitment tx on B's chain + let commitment_tx = nodes[2].node.channel_state.lock().unwrap().by_id.get(&chan_2.2).unwrap().last_local_commitment_txn.clone(); + nodes[2].node.claim_funds(payment_preimage); + { + let mut added_monitors = nodes[2].chan_monitor.added_monitors.lock().unwrap(); + assert_eq!(added_monitors.len(), 1); + added_monitors.clear(); + } + let events = nodes[2].node.get_and_clear_pending_msg_events(); + assert_eq!(events.len(), 1); + match events[0] { + MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, .. } } => { + assert!(update_add_htlcs.is_empty()); + assert!(update_fail_htlcs.is_empty()); + assert!(!update_fulfill_htlcs.is_empty()); + assert!(update_fail_malformed_htlcs.is_empty()); + assert_eq!(nodes[1].node.get_our_node_id(), *node_id); + }, + _ => panic!("Unexpected event"), + }; + nodes[2].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![commitment_tx[0].clone()]}, 1); + let events = nodes[2].node.get_and_clear_pending_msg_events(); + assert_eq!(events.len(), 1); + match events[0] { + MessageSendEvent::BroadcastChannelUpdate { .. } => {}, + _ => panic!("Unexpected event"), + } + let node_txn = nodes[2].tx_broadcaster.txn_broadcasted.lock().unwrap().clone(); + + // Verify that B's ChannelManager is able to extract preimage from HTLC Success tx and pass it backward + nodes[1].chain_monitor.block_connected_with_filtering(&Block { header, txdata: node_txn}, 1); + { + let mut added_monitors = nodes[1].chan_monitor.added_monitors.lock().unwrap(); + assert_eq!(added_monitors.len(), 1); + added_monitors.clear(); + } + let events = nodes[1].node.get_and_clear_pending_msg_events(); + assert_eq!(events.len(), 2); + match events[0] { + MessageSendEvent::BroadcastChannelUpdate { .. } => {}, + _ => panic!("Unexpected event"), + } + match events[1] { + MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fail_htlcs, ref update_fulfill_htlcs, ref update_fail_malformed_htlcs, .. } } => { + assert!(update_add_htlcs.is_empty()); + assert!(update_fail_htlcs.is_empty()); + assert!(!update_fulfill_htlcs.is_empty()); + assert!(update_fail_malformed_htlcs.is_empty()); + assert_eq!(nodes[0].node.get_our_node_id(), *node_id); + }, + _ => panic!("Unexpected event"), + }; + + // Broadcast legit commitment tx from A on B's chain + // Broadcast preimage tx by B on offered output from A commitment tx on A's chain + let commitment_tx = nodes[0].node.channel_state.lock().unwrap().by_id.get(&chan_1.2).unwrap().last_local_commitment_txn.clone(); + nodes[1].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![commitment_tx[0].clone()]}, 1); + let events = nodes[1].node.get_and_clear_pending_msg_events(); + assert_eq!(events.len(), 1); + match events[0] { + MessageSendEvent::BroadcastChannelUpdate { .. } => {}, + _ => panic!("Unexpected event"), + } + let node_txn = nodes[1].tx_broadcaster.txn_broadcasted.lock().unwrap().clone(); + + // Verify that A's ChannelManager is able to extract preimage from preimage tx and pass it backward + nodes[0].chain_monitor.block_connected_with_filtering(&Block { header, txdata: node_txn }, 1); + let events = nodes[0].node.get_and_clear_pending_msg_events(); + assert_eq!(events.len(), 1); + match events[0] { + MessageSendEvent::BroadcastChannelUpdate { .. } => {}, + _ => panic!("Unexpected event"), + } + } + #[test] fn test_htlc_ignore_latest_remote_commitment() { // Test that HTLC transactions spending the latest remote commitment transaction are simply diff --git a/src/ln/channelmonitor.rs b/src/ln/channelmonitor.rs index 92c95efc6..ca8f20bba 100644 --- a/src/ln/channelmonitor.rs +++ b/src/ln/channelmonitor.rs @@ -39,7 +39,7 @@ use util::ser::{ReadableArgs, Readable, Writer, Writeable, WriterWriteAdaptor, U use util::sha2::Sha256; use util::{byte_utils, events}; -use std::collections::HashMap; +use std::collections::{HashMap, hash_map}; use std::sync::{Arc,Mutex}; use std::{hash,cmp, mem}; @@ -100,6 +100,10 @@ pub trait ManyChannelMonitor: Send + Sync { /// ChainWatchInterfaces such that the provided monitor receives block_connected callbacks with /// any spends of it. fn add_update_monitor(&self, funding_txo: OutPoint, monitor: ChannelMonitor) -> Result<(), ChannelMonitorUpdateErr>; + + /// Used by ChannelManager to get list of HTLC resolved onchain and which needed to be updated + /// with success or failure backward + fn fetch_pending_htlc_updated(&self) -> Vec<([u8; 32], Option<[u8; 32]>, Option)>; } /// A simple implementation of a ManyChannelMonitor and ChainListener. Can be used to create a @@ -121,6 +125,7 @@ pub struct SimpleManyChannelMonitor { chain_monitor: Arc, broadcaster: Arc, pending_events: Mutex>, + pending_htlc_updated: Mutex, Option)>>>, logger: Arc, } @@ -128,20 +133,91 @@ impl ChainListener for SimpleManyChannelMonit fn block_connected(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], _indexes_of_txn_matched: &[u32]) { let block_hash = header.bitcoin_hash(); let mut new_events: Vec = Vec::with_capacity(0); + let mut htlc_updated_infos = Vec::new(); { let mut monitors = self.monitors.lock().unwrap(); for monitor in monitors.values_mut() { - let (txn_outputs, spendable_outputs) = monitor.block_connected(txn_matched, height, &block_hash, &*self.broadcaster); + let (txn_outputs, spendable_outputs, mut htlc_updated) = monitor.block_connected(txn_matched, height, &block_hash, &*self.broadcaster); if spendable_outputs.len() > 0 { new_events.push(events::Event::SpendableOutputs { outputs: spendable_outputs, }); } + for (ref txid, ref outputs) in txn_outputs { for (idx, output) in outputs.iter().enumerate() { self.chain_monitor.install_watch_outpoint((txid.clone(), idx as u32), &output.script_pubkey); } } + htlc_updated_infos.append(&mut htlc_updated); + } + } + { + let mut monitors = self.monitors.lock().unwrap(); + for htlc in &htlc_updated_infos { + if htlc.1.is_some() { + for monitor in monitors.values_mut() { + let our_short_channel_id; + match monitor.key_storage { + Storage::Local { ref short_channel_id, .. } => { + our_short_channel_id = *short_channel_id.as_ref().unwrap(); + }, + Storage::Watchtower { .. } => { + unimplemented!(); + } + } + if let Some(ref htlc_source) = htlc.0 { + match htlc_source { + &HTLCSource::PreviousHopData(ref source) => { + if source.short_channel_id == our_short_channel_id { + monitor.provide_payment_preimage(&htlc.2, &htlc.1.unwrap()); + // We maybe call again same monitor, to be sure that in case of 2 remote commitment tx from different channels + // in same block we claim well HTLCs on downstream one + // txn_outputs and htlc_data are there irrelevant + let (_, spendable_outputs, _) = monitor.block_connected(txn_matched, height, &block_hash, &*self.broadcaster); + if spendable_outputs.len() > 0 { + new_events.push(events::Event::SpendableOutputs { + outputs: spendable_outputs, + }); + } + break; + } + }, + &HTLCSource::OutboundRoute { .. } => { + // No hop backward ! + } + } + } + } + } + } + } + { + // ChannelManager will just need to fetch pending_htlc_updated and pass state backward + let mut pending_htlc_updated = self.pending_htlc_updated.lock().unwrap(); + for htlc in htlc_updated_infos.drain(..) { + match pending_htlc_updated.entry(htlc.2) { + hash_map::Entry::Occupied(mut e) => { + // In case of reorg we may have htlc outputs solved in a different way so + // Vacant or Occupied we update key-value with last state of tx resolvation + // We need also to keep only one state per-htlc so prune old one in case of + // block re-scan + e.get_mut().retain(|htlc_data| { + if let Some(ref new_htlc_source) = htlc.0 { + if let Some(ref old_htlc_source) = htlc_data.1 { + if new_htlc_source == old_htlc_source{ + return false + } + } + } + true + }); + e.get_mut().push((htlc.1, htlc.0)); + } + hash_map::Entry::Vacant(e) => { + e.insert(vec![(htlc.1, htlc.0)]); + } + } } } let mut pending_events = self.pending_events.lock().unwrap(); @@ -160,6 +236,7 @@ impl SimpleManyChannelMonitor chain_monitor, broadcaster, pending_events: Mutex::new(Vec::new()), + pending_htlc_updated: Mutex::new(HashMap::new()), logger, }); let weak_res = Arc::downgrade(&res); @@ -206,6 +283,17 @@ impl ManyChannelMonitor for SimpleManyChannelMonitor { Err(_) => Err(ChannelMonitorUpdateErr::PermanentFailure), } } + + fn fetch_pending_htlc_updated(&self) -> Vec<([u8; 32], Option<[u8; 32]>, Option)> { + let mut updated = self.pending_htlc_updated.lock().unwrap(); + let mut pending_htlcs_updated = Vec::with_capacity(updated.len()); + for (k, v) in updated.drain() { + for htlc_data in v { + pending_htlcs_updated.push((k, htlc_data.0, htlc_data.1)); + } + } + pending_htlcs_updated + } } impl events::EventsProvider for SimpleManyChannelMonitor { @@ -1608,9 +1696,10 @@ impl ChannelMonitor { } } - fn block_connected(&mut self, txn_matched: &[&Transaction], height: u32, block_hash: &Sha256dHash, broadcaster: &BroadcasterInterface)-> (Vec<(Sha256dHash, Vec)>, Vec) { + fn block_connected(&mut self, txn_matched: &[&Transaction], height: u32, block_hash: &Sha256dHash, broadcaster: &BroadcasterInterface)-> (Vec<(Sha256dHash, Vec)>, Vec, Vec<(Option, Option<[u8 ; 32]>, [u8; 32])>) { let mut watch_outputs = Vec::new(); let mut spendable_outputs = Vec::new(); + let mut htlc_updated = Vec::new(); for tx in txn_matched { if tx.input.len() == 1 { // Assuming our keys were not leaked (in which case we're screwed no matter what), @@ -1661,6 +1750,10 @@ impl ChannelMonitor { for tx in txn.iter() { broadcaster.broadcast_transaction(tx); } + let mut updated = self.is_resolving_output(tx); + if updated.len() > 0 { + htlc_updated.append(&mut updated); + } } } if let Some(ref cur_local_tx) = self.current_local_signed_commitment_tx { @@ -1691,7 +1784,7 @@ impl ChannelMonitor { } } self.last_block_hash = block_hash.clone(); - (watch_outputs, spendable_outputs) + (watch_outputs, spendable_outputs, htlc_updated) } pub(super) fn would_broadcast_at_height(&self, height: u32) -> bool { @@ -1725,6 +1818,73 @@ impl ChannelMonitor { } false } + + pub(crate) fn is_resolving_output(&mut self, tx: &Transaction) -> Vec<(Option, Option<[u8;32]>, [u8;32])> { + let mut htlc_updated = Vec::new(); + + let commitment_number = 0xffffffffffff - ((((tx.input[0].sequence as u64 & 0xffffff) << 3*8) | (tx.lock_time as u64 & 0xffffff)) ^ self.commitment_transaction_number_obscure_factor); + if commitment_number >= self.get_min_seen_secret() { + if let Some(ref current_local_signed_commitment_tx) = self.current_local_signed_commitment_tx { + for htlc_output in ¤t_local_signed_commitment_tx.htlc_outputs { + htlc_updated.push((htlc_output.3.clone(), None, htlc_output.0.payment_hash.clone())) + } + } + if let Some(ref prev_local_signed_commitment_tx) = self.prev_local_signed_commitment_tx { + for htlc_output in &prev_local_signed_commitment_tx.htlc_outputs { + htlc_updated.push((htlc_output.3.clone(), None, htlc_output.0.payment_hash.clone())) + } + } + // No need to check remote_claimabe_outpoints, symmetric HTLCSource must be present as per-htlc data on local commitment tx + } else if tx.input.len() > 0{ + for input in &tx.input { + let mut payment_data: (Option, Option<[u8;32]>, Option<[u8;32]>) = (None, None, None); + if let Some(ref current_local_signed_commitment_tx) = self.current_local_signed_commitment_tx { + if input.previous_output.txid == current_local_signed_commitment_tx.txid { + for htlc_output in ¤t_local_signed_commitment_tx.htlc_outputs { + if input.previous_output.vout == htlc_output.0.transaction_output_index { + payment_data = (htlc_output.3.clone(), None, Some(htlc_output.0.payment_hash.clone())); + } + } + } + } + if let Some(ref prev_local_signed_commitment_tx) = self.prev_local_signed_commitment_tx { + if input.previous_output.txid == prev_local_signed_commitment_tx.txid { + for htlc_output in &prev_local_signed_commitment_tx.htlc_outputs { + if input.previous_output.vout == htlc_output.0.transaction_output_index { + payment_data = (htlc_output.3.clone(), None, Some(htlc_output.0.payment_hash.clone())); + } + } + } + } + if let Some(htlc_outputs) = self.remote_claimable_outpoints.get(&input.previous_output.txid) { + for htlc_output in htlc_outputs { + if input.previous_output.vout == htlc_output.0.transaction_output_index { + payment_data = (htlc_output.1.clone(), None, Some(htlc_output.0.payment_hash.clone())); + } + } + } + // If tx isn't solving htlc output from local/remote commitment tx and htlc isn't outbound we don't need + // to broadcast solving backward + if payment_data.0.is_some() && payment_data.2.is_some() { + let mut payment_preimage = [0; 32]; + let mut preimage = None; + if input.witness.len() == 5 && input.witness[4].len() == 138 { + for (arr, vec) in payment_preimage.iter_mut().zip(tx.input[0].witness[3].iter()) { + *arr = *vec; + } + preimage = Some(payment_preimage); + } else if input.witness.len() == 3 && input.witness[2].len() == 133 { + for (arr, vec) in payment_preimage.iter_mut().zip(tx.input[0].witness[1].iter()) { + *arr = *vec; + } + preimage = Some(payment_preimage); + } + htlc_updated.push((payment_data.0, preimage, payment_data.2.unwrap())); + } + } + } + htlc_updated + } } const MAX_ALLOC_SIZE: usize = 64*1024; diff --git a/src/util/test_utils.rs b/src/util/test_utils.rs index 33fb63b5c..4a5aa60d3 100644 --- a/src/util/test_utils.rs +++ b/src/util/test_utils.rs @@ -4,6 +4,7 @@ use chain::transaction::OutPoint; use ln::channelmonitor; use ln::msgs; use ln::msgs::{HandleError}; +use ln::channelmanager::HTLCSource; use util::events; use util::logger::{Logger, Level, Record}; use util::ser::{ReadableArgs, Writer}; @@ -64,6 +65,10 @@ impl channelmonitor::ManyChannelMonitor for TestChannelMonitor { assert!(self.simple_monitor.add_update_monitor(funding_txo, monitor).is_ok()); self.update_ret.lock().unwrap().clone() } + + fn fetch_pending_htlc_updated(&self) -> Vec<([u8; 32], Option<[u8; 32]>, Option)> { + return self.simple_monitor.fetch_pending_htlc_updated(); + } } pub struct TestBroadcaster { -- 2.39.5