X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fchain%2Fchainmonitor.rs;h=3d84fdf93a52f391200fde732b0954e77e01d724;hb=12687d75d5a0eba4bc691998e51a5313c715e559;hp=aae260e735bdbae5c0a538af8024e7e2ef2a78cb;hpb=132b07239784749dcadfc2959a9487107789df56;p=rust-lightning diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index aae260e7..3d84fdf9 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -43,6 +43,7 @@ use prelude::*; use sync::{RwLock, RwLockReadGuard, Mutex, MutexGuard}; use core::ops::Deref; use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use bitcoin::secp256k1::PublicKey; #[derive(Clone, Copy, Hash, PartialEq, Eq)] /// A specific update's ID stored in a `MonitorUpdateId`, separated out to make the contents @@ -235,7 +236,7 @@ pub struct ChainMonitor>, + pending_monitor_events: Mutex, Option)>>, /// The best block height seen, used as a proxy for the passage of time. highest_chain_height: AtomicUsize, } @@ -262,82 +263,67 @@ where C::Target: chain::Filter, where FN: Fn(&ChannelMonitor, &TransactionData) -> Vec { - let mut dependent_txdata = Vec::new(); - { - let monitor_states = self.monitors.write().unwrap(); - if let Some(height) = best_height { - // If the best block height is being updated, update highest_chain_height under the - // monitors write lock. - let old_height = self.highest_chain_height.load(Ordering::Acquire); - let new_height = height as usize; - if new_height > old_height { - self.highest_chain_height.store(new_height, Ordering::Release); - } + let monitor_states = self.monitors.write().unwrap(); + if let Some(height) = best_height { + // If the best block height is being updated, update highest_chain_height under the + // monitors write lock. + let old_height = self.highest_chain_height.load(Ordering::Acquire); + let new_height = height as usize; + if new_height > old_height { + self.highest_chain_height.store(new_height, Ordering::Release); } + } - for (funding_outpoint, monitor_state) in monitor_states.iter() { - let monitor = &monitor_state.monitor; - let mut txn_outputs; - { - txn_outputs = process(monitor, txdata); - let update_id = MonitorUpdateId { - contents: UpdateOrigin::ChainSync(self.sync_persistence_id.get_increment()), - }; - let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap(); - if let Some(height) = best_height { - if !monitor_state.has_pending_chainsync_updates(&pending_monitor_updates) { - // If there are not ChainSync persists awaiting completion, go ahead and - // set last_chain_persist_height here - we wouldn't want the first - // TemporaryFailure to always immediately be considered "overly delayed". - monitor_state.last_chain_persist_height.store(height as usize, Ordering::Release); - } + for (funding_outpoint, monitor_state) in monitor_states.iter() { + let monitor = &monitor_state.monitor; + let mut txn_outputs; + { + txn_outputs = process(monitor, txdata); + let update_id = MonitorUpdateId { + contents: UpdateOrigin::ChainSync(self.sync_persistence_id.get_increment()), + }; + let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap(); + if let Some(height) = best_height { + if !monitor_state.has_pending_chainsync_updates(&pending_monitor_updates) { + // If there are not ChainSync persists awaiting completion, go ahead and + // set last_chain_persist_height here - we wouldn't want the first + // TemporaryFailure to always immediately be considered "overly delayed". + monitor_state.last_chain_persist_height.store(height as usize, Ordering::Release); } + } - log_trace!(self.logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor)); - match self.persister.update_persisted_channel(*funding_outpoint, &None, monitor, update_id) { - Ok(()) => - log_trace!(self.logger, "Finished syncing Channel Monitor for channel {}", log_funding_info!(monitor)), - Err(ChannelMonitorUpdateErr::PermanentFailure) => { - monitor_state.channel_perm_failed.store(true, Ordering::Release); - self.pending_monitor_events.lock().unwrap().push(MonitorEvent::UpdateFailed(*funding_outpoint)); - }, - Err(ChannelMonitorUpdateErr::TemporaryFailure) => { - log_debug!(self.logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor)); - pending_monitor_updates.push(update_id); - }, - } + log_trace!(self.logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor)); + match self.persister.update_persisted_channel(*funding_outpoint, &None, monitor, update_id) { + Ok(()) => + log_trace!(self.logger, "Finished syncing Channel Monitor for channel {}", log_funding_info!(monitor)), + Err(ChannelMonitorUpdateErr::PermanentFailure) => { + monitor_state.channel_perm_failed.store(true, Ordering::Release); + self.pending_monitor_events.lock().unwrap().push((*funding_outpoint, vec![MonitorEvent::UpdateFailed(*funding_outpoint)], monitor.get_counterparty_node_id())); + }, + Err(ChannelMonitorUpdateErr::TemporaryFailure) => { + log_debug!(self.logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor)); + pending_monitor_updates.push(update_id); + }, } + } - // Register any new outputs with the chain source for filtering, storing any dependent - // transactions from within the block that previously had not been included in txdata. - if let Some(ref chain_source) = self.chain_source { - let block_hash = header.block_hash(); - for (txid, mut outputs) in txn_outputs.drain(..) { - for (idx, output) in outputs.drain(..) { - // Register any new outputs with the chain source for filtering and recurse - // if it indicates that there are dependent transactions within the block - // that had not been previously included in txdata. - let output = WatchedOutput { - block_hash: Some(block_hash), - outpoint: OutPoint { txid, index: idx as u16 }, - script_pubkey: output.script_pubkey, - }; - if let Some(tx) = chain_source.register_output(output) { - dependent_txdata.push(tx); - } - } + // Register any new outputs with the chain source for filtering, storing any dependent + // transactions from within the block that previously had not been included in txdata. + if let Some(ref chain_source) = self.chain_source { + let block_hash = header.block_hash(); + for (txid, mut outputs) in txn_outputs.drain(..) { + for (idx, output) in outputs.drain(..) { + // Register any new outputs with the chain source for filtering + let output = WatchedOutput { + block_hash: Some(block_hash), + outpoint: OutPoint { txid, index: idx as u16 }, + script_pubkey: output.script_pubkey, + }; + chain_source.register_output(output) } } } } - - // Recursively call for any dependent transactions that were identified by the chain source. - if !dependent_txdata.is_empty() { - dependent_txdata.sort_unstable_by_key(|(index, _tx)| *index); - dependent_txdata.dedup_by_key(|(index, _tx)| *index); - let txdata: Vec<_> = dependent_txdata.iter().map(|(index, tx)| (*index, tx)).collect(); - self.process_chain_data(header, None, &txdata, process); // We skip the best height the second go-around - } } /// Creates a new `ChainMonitor` used to watch on-chain activity pertaining to channels. @@ -455,10 +441,10 @@ where C::Target: chain::Filter, // UpdateCompleted event. return Ok(()); } - self.pending_monitor_events.lock().unwrap().push(MonitorEvent::UpdateCompleted { + self.pending_monitor_events.lock().unwrap().push((funding_txo, vec![MonitorEvent::UpdateCompleted { funding_txo, monitor_update_id: monitor_data.monitor.get_latest_update_id(), - }); + }], monitor_data.monitor.get_counterparty_node_id())); }, MonitorUpdateId { contents: UpdateOrigin::ChainSync(_) } => { if !monitor_data.has_pending_chainsync_updates(&pending_monitor_updates) { @@ -476,10 +462,12 @@ where C::Target: chain::Filter, /// channel_monitor_updated once with the highest ID. #[cfg(any(test, fuzzing))] pub fn force_channel_monitor_updated(&self, funding_txo: OutPoint, monitor_update_id: u64) { - self.pending_monitor_events.lock().unwrap().push(MonitorEvent::UpdateCompleted { + let monitors = self.monitors.read().unwrap(); + let counterparty_node_id = monitors.get(&funding_txo).and_then(|m| m.monitor.get_counterparty_node_id()); + self.pending_monitor_events.lock().unwrap().push((funding_txo, vec![MonitorEvent::UpdateCompleted { funding_txo, monitor_update_id, - }); + }], counterparty_node_id)); } #[cfg(any(test, fuzzing, feature = "_test_utils"))] @@ -636,7 +624,7 @@ where C::Target: chain::Filter, Some(monitor_state) => { let monitor = &monitor_state.monitor; log_trace!(self.logger, "Updating ChannelMonitor for channel {}", log_funding_info!(monitor)); - let update_res = monitor.update_monitor(&update, &self.broadcaster, &self.fee_estimator, &self.logger); + let update_res = monitor.update_monitor(&update, &self.broadcaster, &*self.fee_estimator, &self.logger); if update_res.is_err() { log_error!(self.logger, "Failed to update ChannelMonitor for channel {}.", log_funding_info!(monitor)); } @@ -666,7 +654,7 @@ where C::Target: chain::Filter, } } - fn release_pending_monitor_events(&self) -> Vec { + fn release_pending_monitor_events(&self) -> Vec<(OutPoint, Vec, Option)> { let mut pending_monitor_events = self.pending_monitor_events.lock().unwrap().split_off(0); for monitor_state in self.monitors.read().unwrap().values() { let is_pending_monitor_update = monitor_state.has_pending_chainsync_updates(&monitor_state.pending_monitor_updates.lock().unwrap()); @@ -692,7 +680,12 @@ where C::Target: chain::Filter, log_error!(self.logger, " To avoid funds-loss, we are allowing monitor updates to be released."); log_error!(self.logger, " This may cause duplicate payment events to be generated."); } - pending_monitor_events.append(&mut monitor_state.monitor.get_and_clear_pending_monitor_events()); + let monitor_events = monitor_state.monitor.get_and_clear_pending_monitor_events(); + if monitor_events.len() > 0 { + let monitor_outpoint = monitor_state.monitor.get_funding_txo().0; + let counterparty_node_id = monitor_state.monitor.get_counterparty_node_id(); + pending_monitor_events.push((monitor_outpoint, monitor_events, counterparty_node_id)); + } } } pending_monitor_events @@ -725,9 +718,10 @@ impl even #[cfg(test)] mod tests { - use bitcoin::BlockHeader; + use bitcoin::{BlockHeader, TxMerkleNode}; + use bitcoin::hashes::Hash; use ::{check_added_monitors, check_closed_broadcast, check_closed_event}; - use ::{expect_payment_sent, expect_payment_sent_without_paths, expect_payment_path_successful, get_event_msg}; + use ::{expect_payment_sent, expect_payment_claimed, expect_payment_sent_without_paths, expect_payment_path_successful, get_event_msg}; use ::{get_htlc_update_msgs, get_local_commitment_txn, get_revoke_commit_msgs, get_route_and_payment_hash, unwrap_send_err}; use chain::{ChannelMonitorUpdateErr, Confirm, Watch}; use chain::channelmonitor::LATENCY_GRACE_PERIOD_BLOCKS; @@ -737,50 +731,6 @@ mod tests { use ln::msgs::ChannelMessageHandler; use util::errors::APIError; use util::events::{ClosureReason, MessageSendEvent, MessageSendEventsProvider}; - use util::test_utils::{OnRegisterOutput, TxOutReference}; - - /// Tests that in-block dependent transactions are processed by `block_connected` when not - /// included in `txdata` but returned by [`chain::Filter::register_output`]. For instance, - /// a (non-anchor) commitment transaction's HTLC output may be spent in the same block as the - /// commitment transaction itself. An Electrum client may filter the commitment transaction but - /// needs to return the HTLC transaction so it can be processed. - #[test] - fn connect_block_checks_dependent_transactions() { - let chanmon_cfgs = create_chanmon_cfgs(2); - let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); - let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); - let nodes = create_network(2, &node_cfgs, &node_chanmgrs); - let channel = create_announced_chan_between_nodes( - &nodes, 0, 1, InitFeatures::known(), InitFeatures::known()); - - // Send a payment, saving nodes[0]'s revoked commitment and HTLC-Timeout transactions. - let (commitment_tx, htlc_tx) = { - let payment_preimage = route_payment(&nodes[0], &vec!(&nodes[1])[..], 5_000_000).0; - let mut txn = get_local_commitment_txn!(nodes[0], channel.2); - claim_payment(&nodes[0], &vec!(&nodes[1])[..], payment_preimage); - - assert_eq!(txn.len(), 2); - (txn.remove(0), txn.remove(0)) - }; - - // Set expectations on nodes[1]'s chain source to return dependent transactions. - let htlc_output = TxOutReference(commitment_tx.clone(), 0); - let to_local_output = TxOutReference(commitment_tx.clone(), 1); - let htlc_timeout_output = TxOutReference(htlc_tx.clone(), 0); - nodes[1].chain_source - .expect(OnRegisterOutput { with: htlc_output, returns: Some((1, htlc_tx)) }) - .expect(OnRegisterOutput { with: to_local_output, returns: None }) - .expect(OnRegisterOutput { with: htlc_timeout_output, returns: None }); - - // Notify nodes[1] that nodes[0]'s revoked commitment transaction was mined. The chain - // source should return the dependent HTLC transaction when the HTLC output is registered. - mine_transaction(&nodes[1], &commitment_tx); - - // Clean up so uninteresting assertions don't fail. - check_added_monitors!(nodes[1], 1); - nodes[1].node.get_and_clear_pending_msg_events(); - nodes[1].node.get_and_clear_pending_events(); - } #[test] fn test_async_ooo_offchain_updates() { @@ -794,16 +744,18 @@ mod tests { create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::known(), InitFeatures::known()); // Route two payments to be claimed at the same time. - let payment_preimage_1 = route_payment(&nodes[0], &[&nodes[1]], 1_000_000).0; - let payment_preimage_2 = route_payment(&nodes[0], &[&nodes[1]], 1_000_000).0; + let (payment_preimage_1, payment_hash_1, _) = route_payment(&nodes[0], &[&nodes[1]], 1_000_000); + let (payment_preimage_2, payment_hash_2, _) = route_payment(&nodes[0], &[&nodes[1]], 1_000_000); chanmon_cfgs[1].persister.offchain_monitor_updates.lock().unwrap().clear(); chanmon_cfgs[1].persister.set_update_ret(Err(ChannelMonitorUpdateErr::TemporaryFailure)); nodes[1].node.claim_funds(payment_preimage_1); check_added_monitors!(nodes[1], 1); + expect_payment_claimed!(nodes[1], payment_hash_1, 1_000_000); nodes[1].node.claim_funds(payment_preimage_2); check_added_monitors!(nodes[1], 1); + expect_payment_claimed!(nodes[1], payment_hash_2, 1_000_000); chanmon_cfgs[1].persister.set_update_ret(Ok(())); @@ -873,8 +825,9 @@ mod tests { let (route, second_payment_hash, _, second_payment_secret) = get_route_and_payment_hash!(nodes[0], nodes[1], 100_000); // First route a payment that we will claim on chain and give the recipient the preimage. - let payment_preimage = route_payment(&nodes[0], &[&nodes[1]], 1_000_000).0; + let (payment_preimage, payment_hash, _) = route_payment(&nodes[0], &[&nodes[1]], 1_000_000); nodes[1].node.claim_funds(payment_preimage); + expect_payment_claimed!(nodes[1], payment_hash, 1_000_000); nodes[1].node.get_and_clear_pending_msg_events(); check_added_monitors!(nodes[1], 1); let remote_txn = get_local_commitment_txn!(nodes[1], channel.2); @@ -889,7 +842,7 @@ mod tests { let new_header = BlockHeader { version: 2, time: 0, bits: 0, nonce: 0, prev_blockhash: nodes[0].best_block_info().0, - merkle_root: Default::default() }; + merkle_root: TxMerkleNode::all_zeros() }; nodes[0].chain_monitor.chain_monitor.transactions_confirmed(&new_header, &[(0, &remote_txn[0]), (1, &remote_txn[1])], nodes[0].best_block_info().1 + 1); assert!(nodes[0].chain_monitor.release_pending_monitor_events().is_empty()); @@ -915,7 +868,7 @@ mod tests { let latest_header = BlockHeader { version: 2, time: 0, bits: 0, nonce: 0, prev_blockhash: nodes[0].best_block_info().0, - merkle_root: Default::default() }; + merkle_root: TxMerkleNode::all_zeros() }; nodes[0].chain_monitor.chain_monitor.best_block_updated(&latest_header, nodes[0].best_block_info().1 + LATENCY_GRACE_PERIOD_BLOCKS); } else { let persistences = chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap().clone();