From: Matt Corallo <649246+TheBlueMatt@users.noreply.github.com> Date: Mon, 5 Apr 2021 22:12:45 +0000 (+0000) Subject: Merge pull request #838 from TheBlueMatt/2021-03-skip-blocks X-Git-Tag: v0.0.14~38 X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=commitdiff_plain;h=e23c270720df2798c3e35e2ba804d98060d76d17;hp=-c;p=rust-lightning Merge pull request #838 from TheBlueMatt/2021-03-skip-blocks Make `Channel`'s block connection API more electrum-friendly --- e23c270720df2798c3e35e2ba804d98060d76d17 diff --combined fuzz/src/chanmon_consistency.rs index e7cd9900,55c2ffd5..87b95cf2 --- a/fuzz/src/chanmon_consistency.rs +++ b/fuzz/src/chanmon_consistency.rs @@@ -234,7 -234,7 +234,7 @@@ fn check_api_err(api_err: APIError) _ if err.starts_with("Cannot send value that would put our balance under counterparty-announced channel reserve value") => {}, _ if err.starts_with("Cannot send value that would overdraw remaining funds.") => {}, _ if err.starts_with("Cannot send value that would not leave enough to pay for fees.") => {}, - _ => panic!(err), + _ => panic!("{}", err), } }, APIError::MonitorUpdateFailed => { @@@ -435,11 -435,11 +435,11 @@@ pub fn do_test = channel_txn.iter().enumerate().map(|(i, tx)| (i + 1, tx)).collect(); - $node.block_connected(&header, &txdata, 1); - for i in 2..100 { + $node.transactions_confirmed(&header, 1, &txdata); + for _ in 2..100 { header = BlockHeader { version: 0x20000000, prev_blockhash: header.block_hash(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 }; - $node.block_connected(&header, &[], i); } + $node.update_best_block(&header, 99); } } } diff --combined lightning-persister/src/lib.rs index 44586422,4976f929..af11dfd1 --- a/lightning-persister/src/lib.rs +++ b/lightning-persister/src/lib.rs @@@ -3,9 -3,6 +3,9 @@@ #![deny(broken_intra_doc_links)] #![deny(missing_docs)] +#![cfg_attr(all(test, feature = "unstable"), feature(test))] +#[cfg(all(test, feature = "unstable"))] extern crate test; + mod util; extern crate lightning; @@@ -244,7 -241,7 +244,7 @@@ mod tests // Force close because cooperative close doesn't result in any persisted // updates. nodes[0].node.force_close_channel(&nodes[0].node.list_channels()[0].channel_id).unwrap(); - check_closed_broadcast!(nodes[0], false); + check_closed_broadcast!(nodes[0], true); check_added_monitors!(nodes[0], 1); let node_txn = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap(); @@@ -252,7 -249,7 +252,7 @@@ let header = BlockHeader { version: 0x20000000, prev_blockhash: nodes[0].best_block_hash(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 }; connect_block(&nodes[1], &Block { header, txdata: vec![node_txn[0].clone(), node_txn[0].clone()]}); - check_closed_broadcast!(nodes[1], false); + check_closed_broadcast!(nodes[1], true); check_added_monitors!(nodes[1], 1); // Make sure everything is persisted as expected after close. @@@ -333,15 -330,3 +333,15 @@@ added_monitors.clear(); } } + +#[cfg(all(test, feature = "unstable"))] +pub mod bench { + use test::Bencher; + + #[bench] + fn bench_sends(bench: &mut Bencher) { + let persister_a = super::FilesystemPersister::new("bench_filesystem_persister_a".to_string()); + let persister_b = super::FilesystemPersister::new("bench_filesystem_persister_b".to_string()); + lightning::ln::channelmanager::bench::bench_two_sends(bench, persister_a, persister_b); + } +} diff --combined lightning/src/ln/channelmanager.rs index 19fdd97a,31c8dcca..efb2fe9c --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@@ -434,7 -434,6 +434,7 @@@ pub struct ChannelManager>, our_network_key: SecretKey, + our_network_pubkey: PublicKey, /// Used to track the last value sent in a node_announcement "timestamp" field. We ensure this /// value increases strictly since we don't assume access to a time source. @@@ -823,6 -822,7 +823,6 @@@ impl) -> Result<(), APIError> { + fn force_close_channel_with_peer(&self, channel_id: &[u8; 32], peer_node_id: Option<&PublicKey>) -> Result { let mut chan = { let mut channel_state_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_state_lock; if let hash_map::Entry::Occupied(chan) = channel_state.by_id.entry(channel_id.clone()) { if let Some(node_id) = peer_node_id { if chan.get().get_counterparty_node_id() != *node_id { - // Error or Ok here doesn't matter - the result is only exposed publicly - // when peer_node_id is None anyway. - return Ok(()); + return Err(APIError::ChannelUnavailable{err: "No such channel".to_owned()}); } } if let Some(short_id) = chan.get().get_short_channel_id() { @@@ -1033,14 -1029,27 +1031,27 @@@ }); } - Ok(()) + Ok(chan.get_counterparty_node_id()) } /// Force closes a channel, immediately broadcasting the latest local commitment transaction to /// the chain and rejecting new HTLCs on the given channel. Fails if channel_id is unknown to the manager. pub fn force_close_channel(&self, channel_id: &[u8; 32]) -> Result<(), APIError> { let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); - self.force_close_channel_with_peer(channel_id, None) + match self.force_close_channel_with_peer(channel_id, None) { + Ok(counterparty_node_id) => { + self.channel_state.lock().unwrap().pending_msg_events.push( + events::MessageSendEvent::HandleError { + node_id: counterparty_node_id, + action: msgs::ErrorAction::SendErrorMessage { + msg: msgs::ErrorMessage { channel_id: *channel_id, data: "Channel force-closed".to_owned() } + }, + } + ); + Ok(()) + }, + Err(e) => Err(e) + } } /// Force close all channels, immediately broadcasting the latest local commitment transaction @@@ -2317,7 -2326,7 +2328,7 @@@ /// Gets the node_id held by this ChannelManager pub fn get_our_node_id(&self) -> PublicKey { - PublicKey::from_secret_key(&self.secp_ctx, &self.our_network_key) + self.our_network_pubkey.clone() } /// Restores a single, given channel to normal operation after a @@@ -3196,6 -3205,12 +3207,12 @@@ msg: update }); } + pending_msg_events.push(events::MessageSendEvent::HandleError { + node_id: chan.get_counterparty_node_id(), + action: msgs::ErrorAction::SendErrorMessage { + msg: msgs::ErrorMessage { channel_id: chan.channel_id(), data: "Channel force-closed".to_owned() } + }, + }); } }, } @@@ -3278,12 -3293,26 +3295,26 @@@ wher L::Target: Logger, { fn block_connected(&self, block: &Block, height: u32) { + assert_eq!(*self.last_block_hash.read().unwrap(), block.header.prev_blockhash, + "Blocks must be connected in chain-order - the connected header must build on the last connected header"); + assert_eq!(self.latest_block_height.load(Ordering::Acquire) as u64, height as u64 - 1, + "Blocks must be connected in chain-order - the connected block height must be one greater than the previous height"); let txdata: Vec<_> = block.txdata.iter().enumerate().collect(); - ChannelManager::block_connected(self, &block.header, &txdata, height); + self.transactions_confirmed(&block.header, height, &txdata); + self.update_best_block(&block.header, height); } - fn block_disconnected(&self, header: &BlockHeader, _height: u32) { - ChannelManager::block_disconnected(self, header); + fn block_disconnected(&self, header: &BlockHeader, height: u32) { + assert_eq!(*self.last_block_hash.read().unwrap(), header.block_hash(), + "Blocks must be disconnected in chain-order - the disconnected header must be the last connected header"); + + let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); + let new_height = self.latest_block_height.fetch_sub(1, Ordering::AcqRel) as u32 - 1; + assert_eq!(new_height, height - 1, + "Blocks must be disconnected in chain-order - the disconnected block must have the correct height"); + *self.last_block_hash.write().unwrap() = header.prev_blockhash; + + self.do_chain_event(new_height, |channel| channel.update_best_block(new_height, header.time)); } } @@@ -3294,22 -3323,11 +3325,11 @@@ impl) -> Result<(Option, Vec<(HTLCSource, PaymentHash)>), msgs::ErrorMessage>> + (&self, height: u32, f: FN) { // Note that we MUST NOT end up calling methods on self.chain_monitor here - we're called // during initialization prior to the chain_monitor being fully configured in some cases. // See the docs for `ChannelManagerReadArgs` for more. - let block_hash = header.block_hash(); - log_trace!(self.logger, "Block {} at height {} connected", block_hash, height); - - let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); - - assert_eq!(*self.last_block_hash.read().unwrap(), header.prev_blockhash, - "Blocks must be connected in chain-order - the connected header must build on the last connected header"); - assert_eq!(self.latest_block_height.load(Ordering::Acquire) as u64, height as u64 - 1, - "Blocks must be connected in chain-order - the connected header must build on the last connected header"); - self.latest_block_height.store(height as usize, Ordering::Release); - *self.last_block_hash.write().unwrap() = block_hash; let mut failed_channels = Vec::new(); let mut timed_out_htlcs = Vec::new(); @@@ -3319,7 -3337,7 +3339,7 @@@ let short_to_id = &mut channel_state.short_to_id; let pending_msg_events = &mut channel_state.pending_msg_events; channel_state.by_id.retain(|_, channel| { - let res = channel.block_connected(header, txdata, height); + let res = f(channel); if let Ok((chan_res, mut timed_out_pending_htlcs)) = res { for (source, payment_hash) in timed_out_pending_htlcs.drain(..) { let chan_update = self.get_channel_update(&channel).map(|u| u.encode_with_len()).unwrap(); // Cannot add/recv HTLCs before we have a short_id so unwrap is safe @@@ -3345,32 -3363,23 +3365,23 @@@ short_to_id.insert(channel.get_short_channel_id().unwrap(), channel.channel_id()); } } else if let Err(e) = res { + if let Some(short_id) = channel.get_short_channel_id() { + short_to_id.remove(&short_id); + } + // It looks like our counterparty went on-chain or funding transaction was + // reorged out of the main chain. Close the channel. + failed_channels.push(channel.force_shutdown(true)); + if let Ok(update) = self.get_channel_update(&channel) { + pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { + msg: update + }); + } pending_msg_events.push(events::MessageSendEvent::HandleError { node_id: channel.get_counterparty_node_id(), action: msgs::ErrorAction::SendErrorMessage { msg: e }, }); return false; } - if let Some(funding_txo) = channel.get_funding_txo() { - for &(_, tx) in txdata.iter() { - for inp in tx.input.iter() { - if inp.previous_output == funding_txo.into_bitcoin_outpoint() { - log_trace!(self.logger, "Detected channel-closing tx {} spending {}:{}, closing channel {}", tx.txid(), inp.previous_output.txid, inp.previous_output.vout, log_bytes!(channel.channel_id())); - if let Some(short_id) = channel.get_short_channel_id() { - short_to_id.remove(&short_id); - } - // It looks like our counterparty went on-chain. Close the channel. - failed_channels.push(channel.force_shutdown(true)); - if let Ok(update) = self.get_channel_update(&channel) { - pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { - msg: update - }); - } - return false; - } - } - } - } true }); @@@ -3399,6 -3408,64 +3410,64 @@@ for (source, payment_hash, reason) in timed_out_htlcs.drain(..) { self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), source, &payment_hash, reason); } + } + + /// Updates channel state to take note of transactions which were confirmed in the given block + /// at the given height. + /// + /// Note that you must still call (or have called) [`update_best_block`] with the block + /// information which is included here. + /// + /// This method may be called before or after [`update_best_block`] for a given block's + /// transaction data and may be called multiple times with additional transaction data for a + /// given block. + /// + /// This method may be called for a previous block after an [`update_best_block`] call has + /// been made for a later block, however it must *not* be called with transaction data from a + /// block which is no longer in the best chain (ie where [`update_best_block`] has already + /// been informed about a blockchain reorganization which no longer includes the block which + /// corresponds to `header`). + /// + /// [`update_best_block`]: `Self::update_best_block` + pub fn transactions_confirmed(&self, header: &BlockHeader, height: u32, txdata: &TransactionData) { + // Note that we MUST NOT end up calling methods on self.chain_monitor here - we're called + // during initialization prior to the chain_monitor being fully configured in some cases. + // See the docs for `ChannelManagerReadArgs` for more. + + let block_hash = header.block_hash(); + log_trace!(self.logger, "{} transactions included in block {} at height {} provided", txdata.len(), block_hash, height); + + let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); + self.do_chain_event(height, |channel| channel.transactions_confirmed(&block_hash, height, txdata, &self.logger).map(|a| (a, Vec::new()))); + } + + /// Updates channel state with the current best blockchain tip. You should attempt to call this + /// quickly after a new block becomes available, however if multiple new blocks become + /// available at the same time, only a single `update_best_block()` call needs to be made. + /// + /// This method should also be called immediately after any block disconnections, once at the + /// reorganization fork point, and once with the new chain tip. Calling this method at the + /// blockchain reorganization fork point ensures we learn when a funding transaction which was + /// previously confirmed is reorganized out of the blockchain, ensuring we do not continue to + /// accept payments which cannot be enforced on-chain. + /// + /// In both the block-connection and block-disconnection case, this method may be called either + /// once per block connected or disconnected, or simply at the fork point and new tip(s), + /// skipping any intermediary blocks. + pub fn update_best_block(&self, header: &BlockHeader, height: u32) { + // Note that we MUST NOT end up calling methods on self.chain_monitor here - we're called + // during initialization prior to the chain_monitor being fully configured in some cases. + // See the docs for `ChannelManagerReadArgs` for more. + + let block_hash = header.block_hash(); + log_trace!(self.logger, "New best block: {} at height {}", block_hash, height); + + let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); + + self.latest_block_height.store(height as usize, Ordering::Release); + *self.last_block_hash.write().unwrap() = block_hash; + + self.do_chain_event(height, |channel| channel.update_best_block(height, header.time)); loop { // Update last_node_announcement_serial to be the max of its current value and the @@@ -3414,48 -3481,6 +3483,6 @@@ } } - /// Updates channel state based on a disconnected block. - /// - /// If necessary, the channel may be force-closed without letting the counterparty participate - /// in the shutdown. - pub fn block_disconnected(&self, header: &BlockHeader) { - // Note that we MUST NOT end up calling methods on self.chain_monitor here - we're called - // during initialization prior to the chain_monitor being fully configured in some cases. - // See the docs for `ChannelManagerReadArgs` for more. - let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); - - assert_eq!(*self.last_block_hash.read().unwrap(), header.block_hash(), - "Blocks must be disconnected in chain-order - the disconnected header must be the last connected header"); - self.latest_block_height.fetch_sub(1, Ordering::AcqRel); - *self.last_block_hash.write().unwrap() = header.prev_blockhash; - - let mut failed_channels = Vec::new(); - { - let mut channel_lock = self.channel_state.lock().unwrap(); - let channel_state = &mut *channel_lock; - let short_to_id = &mut channel_state.short_to_id; - let pending_msg_events = &mut channel_state.pending_msg_events; - channel_state.by_id.retain(|_, v| { - if v.block_disconnected(header) { - if let Some(short_id) = v.get_short_channel_id() { - short_to_id.remove(&short_id); - } - failed_channels.push(v.force_shutdown(true)); - if let Ok(update) = self.get_channel_update(&v) { - pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { - msg: update - }); - } - false - } else { - true - } - }); - } - - self.handle_init_event_channel_failures(failed_channels); - } - /// Blocks until ChannelManager needs to be persisted or a timeout is reached. It returns a bool /// indicating whether persistence is necessary. Only one listener on /// `await_persistable_update` or `await_persistable_update_timeout` is guaranteed to be woken @@@ -4320,6 -4345,7 +4347,6 @@@ impl<'a, Signer: Sign, M: Deref, T: Der latest_block_height: AtomicUsize::new(latest_block_height as usize), last_block_hash: RwLock::new(last_block_hash), - secp_ctx, channel_state: Mutex::new(ChannelHolder { by_id, @@@ -4329,8 -4355,6 +4356,8 @@@ pending_msg_events: Vec::new(), }), our_network_key: args.keys_manager.get_node_secret(), + our_network_pubkey: PublicKey::from_secret_key(&secp_ctx, &args.keys_manager.get_node_secret()), + secp_ctx, last_node_announcement_serial: AtomicUsize::new(last_node_announcement_serial as usize), @@@ -4407,154 -4431,3 +4434,154 @@@ mod tests } } } + +#[cfg(all(any(test, feature = "_test_utils"), feature = "unstable"))] +pub mod bench { + use chain::Listen; + use chain::chainmonitor::ChainMonitor; + use chain::channelmonitor::Persist; + use chain::keysinterface::{KeysManager, InMemorySigner}; + use chain::transaction::OutPoint; + use ln::channelmanager::{ChainParameters, ChannelManager, PaymentHash, PaymentPreimage}; + use ln::features::InitFeatures; + use ln::functional_test_utils::*; + use ln::msgs::ChannelMessageHandler; + use routing::network_graph::NetworkGraph; + use routing::router::get_route; + use util::test_utils; + use util::config::UserConfig; + use util::events::{Event, EventsProvider, MessageSendEvent, MessageSendEventsProvider}; + + use bitcoin::hashes::Hash; + use bitcoin::hashes::sha256::Hash as Sha256; + use bitcoin::{Block, BlockHeader, Transaction, TxOut}; + + use std::sync::Mutex; + + use test::Bencher; + + struct NodeHolder<'a, P: Persist> { + node: &'a ChannelManager, + &'a test_utils::TestBroadcaster, &'a KeysManager, + &'a test_utils::TestFeeEstimator, &'a test_utils::TestLogger> + } + + #[cfg(test)] + #[bench] + fn bench_sends(bench: &mut Bencher) { + bench_two_sends(bench, test_utils::TestPersister::new(), test_utils::TestPersister::new()); + } + + pub fn bench_two_sends>(bench: &mut Bencher, persister_a: P, persister_b: P) { + // Do a simple benchmark of sending a payment back and forth between two nodes. + // Note that this is unrealistic as each payment send will require at least two fsync + // calls per node. + let network = bitcoin::Network::Testnet; + let genesis_hash = bitcoin::blockdata::constants::genesis_block(network).header.block_hash(); + + let tx_broadcaster = test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new())}; + let fee_estimator = test_utils::TestFeeEstimator { sat_per_kw: 253 }; + + let mut config: UserConfig = Default::default(); + config.own_channel_config.minimum_depth = 1; + + let logger_a = test_utils::TestLogger::with_id("node a".to_owned()); + let chain_monitor_a = ChainMonitor::new(None, &tx_broadcaster, &logger_a, &fee_estimator, &persister_a); + let seed_a = [1u8; 32]; + let keys_manager_a = KeysManager::new(&seed_a, 42, 42); + let node_a = ChannelManager::new(&fee_estimator, &chain_monitor_a, &tx_broadcaster, &logger_a, &keys_manager_a, config.clone(), ChainParameters { + network, + latest_hash: genesis_hash, + latest_height: 0, + }); + let node_a_holder = NodeHolder { node: &node_a }; + + let logger_b = test_utils::TestLogger::with_id("node a".to_owned()); + let chain_monitor_b = ChainMonitor::new(None, &tx_broadcaster, &logger_a, &fee_estimator, &persister_b); + let seed_b = [2u8; 32]; + let keys_manager_b = KeysManager::new(&seed_b, 42, 42); + let node_b = ChannelManager::new(&fee_estimator, &chain_monitor_b, &tx_broadcaster, &logger_b, &keys_manager_b, config.clone(), ChainParameters { + network, + latest_hash: genesis_hash, + latest_height: 0, + }); + let node_b_holder = NodeHolder { node: &node_b }; + + node_a.create_channel(node_b.get_our_node_id(), 8_000_000, 100_000_000, 42, None).unwrap(); + node_b.handle_open_channel(&node_a.get_our_node_id(), InitFeatures::known(), &get_event_msg!(node_a_holder, MessageSendEvent::SendOpenChannel, node_b.get_our_node_id())); + node_a.handle_accept_channel(&node_b.get_our_node_id(), InitFeatures::known(), &get_event_msg!(node_b_holder, MessageSendEvent::SendAcceptChannel, node_a.get_our_node_id())); + + let tx; + if let Event::FundingGenerationReady { temporary_channel_id, output_script, .. } = get_event!(node_a_holder, Event::FundingGenerationReady) { + tx = Transaction { version: 2, lock_time: 0, input: Vec::new(), output: vec![TxOut { + value: 8_000_000, script_pubkey: output_script, + }]}; + let funding_outpoint = OutPoint { txid: tx.txid(), index: 0 }; + node_a.funding_transaction_generated(&temporary_channel_id, funding_outpoint); + } else { panic!(); } + + node_b.handle_funding_created(&node_a.get_our_node_id(), &get_event_msg!(node_a_holder, MessageSendEvent::SendFundingCreated, node_b.get_our_node_id())); + node_a.handle_funding_signed(&node_b.get_our_node_id(), &get_event_msg!(node_b_holder, MessageSendEvent::SendFundingSigned, node_a.get_our_node_id())); + + get_event!(node_a_holder, Event::FundingBroadcastSafe); + + let block = Block { + header: BlockHeader { version: 0x20000000, prev_blockhash: genesis_hash, merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 }, + txdata: vec![tx], + }; + Listen::block_connected(&node_a, &block, 1); + Listen::block_connected(&node_b, &block, 1); + + node_a.handle_funding_locked(&node_b.get_our_node_id(), &get_event_msg!(node_b_holder, MessageSendEvent::SendFundingLocked, node_a.get_our_node_id())); + node_b.handle_funding_locked(&node_a.get_our_node_id(), &get_event_msg!(node_a_holder, MessageSendEvent::SendFundingLocked, node_b.get_our_node_id())); + + let dummy_graph = NetworkGraph::new(genesis_hash); + + macro_rules! send_payment { + ($node_a: expr, $node_b: expr) => { + let usable_channels = $node_a.list_usable_channels(); + let route = get_route(&$node_a.get_our_node_id(), &dummy_graph, &$node_b.get_our_node_id(), None, Some(&usable_channels.iter().map(|r| r).collect::>()), &[], 10_000, TEST_FINAL_CLTV, &logger_a).unwrap(); + + let payment_preimage = PaymentPreimage([0; 32]); + let payment_hash = PaymentHash(Sha256::hash(&payment_preimage.0[..]).into_inner()); + + $node_a.send_payment(&route, payment_hash, &None).unwrap(); + let payment_event = SendEvent::from_event($node_a.get_and_clear_pending_msg_events().pop().unwrap()); + $node_b.handle_update_add_htlc(&$node_a.get_our_node_id(), &payment_event.msgs[0]); + $node_b.handle_commitment_signed(&$node_a.get_our_node_id(), &payment_event.commitment_msg); + let (raa, cs) = get_revoke_commit_msgs!(NodeHolder { node: &$node_b }, $node_a.get_our_node_id()); + $node_a.handle_revoke_and_ack(&$node_b.get_our_node_id(), &raa); + $node_a.handle_commitment_signed(&$node_b.get_our_node_id(), &cs); + $node_b.handle_revoke_and_ack(&$node_a.get_our_node_id(), &get_event_msg!(NodeHolder { node: &$node_a }, MessageSendEvent::SendRevokeAndACK, $node_b.get_our_node_id())); + + expect_pending_htlcs_forwardable!(NodeHolder { node: &$node_b }); + expect_payment_received!(NodeHolder { node: &$node_b }, payment_hash, 10_000); + assert!($node_b.claim_funds(payment_preimage, &None, 10_000)); + + match $node_b.get_and_clear_pending_msg_events().pop().unwrap() { + MessageSendEvent::UpdateHTLCs { node_id, updates } => { + assert_eq!(node_id, $node_a.get_our_node_id()); + $node_a.handle_update_fulfill_htlc(&$node_b.get_our_node_id(), &updates.update_fulfill_htlcs[0]); + $node_a.handle_commitment_signed(&$node_b.get_our_node_id(), &updates.commitment_signed); + }, + _ => panic!("Failed to generate claim event"), + } + + let (raa, cs) = get_revoke_commit_msgs!(NodeHolder { node: &$node_a }, $node_b.get_our_node_id()); + $node_b.handle_revoke_and_ack(&$node_a.get_our_node_id(), &raa); + $node_b.handle_commitment_signed(&$node_a.get_our_node_id(), &cs); + $node_a.handle_revoke_and_ack(&$node_b.get_our_node_id(), &get_event_msg!(NodeHolder { node: &$node_b }, MessageSendEvent::SendRevokeAndACK, $node_a.get_our_node_id())); + + expect_payment_sent!(NodeHolder { node: &$node_a }, payment_preimage); + } + } + + bench.iter(|| { + send_payment!(node_a, node_b); + send_payment!(node_b, node_a); + }); + } +} diff --combined lightning/src/ln/functional_test_utils.rs index 3c641dd7,ed1585f6..af11ef5d --- a/lightning/src/ln/functional_test_utils.rs +++ b/lightning/src/ln/functional_test_utils.rs @@@ -10,7 -10,7 +10,7 @@@ //! A bunch of useful utilities for building networks of nodes and exchanging messages between //! nodes for functional tests. - use chain::Watch; + use chain::{Listen, Watch}; use chain::channelmonitor::ChannelMonitor; use chain::transaction::OutPoint; use ln::channelmanager::{ChainParameters, ChannelManager, ChannelManagerReadArgs, RAACommitmentOrder, PaymentPreimage, PaymentHash, PaymentSecret, PaymentSendFailure}; @@@ -60,21 -60,15 +60,15 @@@ pub fn mine_transaction<'a, 'b, 'c, 'd> /// Mine the given transaction at the given height, mining blocks as required to build to that /// height pub fn confirm_transaction_at<'a, 'b, 'c, 'd>(node: &'a Node<'b, 'c, 'd>, tx: &Transaction, conf_height: u32) { - let starting_block = node.best_block_info(); + let first_connect_height = node.best_block_info().1 + 1; + assert!(first_connect_height <= conf_height); + if conf_height - first_connect_height >= 1 { + connect_blocks(node, conf_height - first_connect_height); + } let mut block = Block { - header: BlockHeader { version: 0x20000000, prev_blockhash: starting_block.0, merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 }, + header: BlockHeader { version: 0x20000000, prev_blockhash: node.best_block_hash(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 }, txdata: Vec::new(), }; - let height = starting_block.1 + 1; - assert!(height <= conf_height); - for _ in height..conf_height { - connect_block(node, &block); - block = Block { - header: BlockHeader { version: 0x20000000, prev_blockhash: block.header.block_hash(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 }, - txdata: vec![], - }; - } - for _ in 0..*node.network_chan_count.borrow() { // Make sure we don't end up with channels at the same short id by offsetting by chan_count block.txdata.push(Transaction { version: 0, lock_time: 0, input: Vec::new(), output: Vec::new() }); } @@@ -82,37 -76,94 +76,94 @@@ connect_block(node, &block); } + /// The possible ways we may notify a ChannelManager of a new block + pub enum ConnectStyle { + /// Calls update_best_block first, detecting transactions in the block only after receiving the + /// header and height information. + BestBlockFirst, + /// The same as BestBlockFirst, however when we have multiple blocks to connect, we only + /// make a single update_best_block call. + BestBlockFirstSkippingBlocks, + /// Calls transactions_confirmed first, detecting transactions in the block before updating the + /// header and height information. + TransactionsFirst, + /// The same as TransactionsFirst, however when we have multiple blocks to connect, we only + /// make a single update_best_block call. + TransactionsFirstSkippingBlocks, + /// Provides the full block via the chain::Listen interface. In the current code this is + /// equivalent to TransactionsFirst with some additional assertions. + FullBlockViaListen, + } + pub fn connect_blocks<'a, 'b, 'c, 'd>(node: &'a Node<'b, 'c, 'd>, depth: u32) -> BlockHash { + let skip_intermediaries = match *node.connect_style.borrow() { + ConnectStyle::BestBlockFirstSkippingBlocks|ConnectStyle::TransactionsFirstSkippingBlocks => true, + _ => false, + }; + let mut block = Block { header: BlockHeader { version: 0x2000000, prev_blockhash: node.best_block_hash(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 }, txdata: vec![], }; - connect_block(node, &block); - for _ in 2..depth + 1 { + assert!(depth >= 1); + for _ in 0..depth - 1 { + do_connect_block(node, &block, skip_intermediaries); block = Block { header: BlockHeader { version: 0x20000000, prev_blockhash: block.header.block_hash(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 }, txdata: vec![], }; - connect_block(node, &block); } + connect_block(node, &block); block.header.block_hash() } pub fn connect_block<'a, 'b, 'c, 'd>(node: &'a Node<'b, 'c, 'd>, block: &Block) { + do_connect_block(node, block, false); + } + + fn do_connect_block<'a, 'b, 'c, 'd>(node: &'a Node<'b, 'c, 'd>, block: &Block, skip_manager: bool) { let txdata: Vec<_> = block.txdata.iter().enumerate().collect(); let height = node.best_block_info().1 + 1; node.chain_monitor.chain_monitor.block_connected(&block.header, &txdata, height); - node.node.block_connected(&block.header, &txdata, height); + if !skip_manager { + match *node.connect_style.borrow() { + ConnectStyle::BestBlockFirst|ConnectStyle::BestBlockFirstSkippingBlocks => { + node.node.update_best_block(&block.header, height); + node.node.transactions_confirmed(&block.header, height, &block.txdata.iter().enumerate().collect::>()); + }, + ConnectStyle::TransactionsFirst|ConnectStyle::TransactionsFirstSkippingBlocks => { + node.node.transactions_confirmed(&block.header, height, &block.txdata.iter().enumerate().collect::>()); + node.node.update_best_block(&block.header, height); + }, + ConnectStyle::FullBlockViaListen => { + Listen::block_connected(node.node, &block, height); + } + } + } node.node.test_process_background_events(); node.blocks.borrow_mut().push((block.header, height)); } pub fn disconnect_blocks<'a, 'b, 'c, 'd>(node: &'a Node<'b, 'c, 'd>, count: u32) { - for _ in 0..count { + for i in 0..count { let orig_header = node.blocks.borrow_mut().pop().unwrap(); assert!(orig_header.1 > 0); // Cannot disconnect genesis + let prev_header = node.blocks.borrow().last().unwrap().clone(); + node.chain_monitor.chain_monitor.block_disconnected(&orig_header.0, orig_header.1); - node.node.block_disconnected(&orig_header.0); + match *node.connect_style.borrow() { + ConnectStyle::FullBlockViaListen => { + Listen::block_disconnected(node.node, &orig_header.0, orig_header.1); + }, + ConnectStyle::BestBlockFirstSkippingBlocks|ConnectStyle::TransactionsFirstSkippingBlocks => { + if i == count - 1 { + node.node.update_best_block(&prev_header.0, prev_header.1); + } + }, + _ => { + node.node.update_best_block(&prev_header.0, prev_header.1); + }, + } } } @@@ -152,6 -203,7 +203,7 @@@ pub struct Node<'a, 'b: 'a, 'c: 'b> pub network_chan_count: Rc>, pub logger: &'c test_utils::TestLogger, pub blocks: RefCell>, + pub connect_style: Rc>, } impl<'a, 'b, 'c> Node<'a, 'b, 'c> { pub fn best_block_hash(&self) -> BlockHash { @@@ -313,24 -365,6 +365,24 @@@ macro_rules! get_event_msg } } +/// Get a specific event from the pending events queue. +#[macro_export] +macro_rules! get_event { + ($node: expr, $event_type: path) => { + { + let mut events = $node.node.get_and_clear_pending_events(); + assert_eq!(events.len(), 1); + let ev = events.pop().unwrap(); + match ev { + $event_type { .. } => { + ev + }, + _ => panic!("Unexpected event"), + } + } + } +} + #[cfg(test)] macro_rules! get_htlc_update_msgs { ($node: expr, $node_id: expr) => { @@@ -359,8 -393,7 +411,8 @@@ macro_rules! get_feerate } } -#[cfg(test)] +/// Returns any local commitment transactions for the channel. +#[macro_export] macro_rules! get_local_commitment_txn { ($node: expr, $channel_id: expr) => { { @@@ -866,7 -899,7 +918,7 @@@ macro_rules! expect_pending_htlcs_forwa }} } -#[cfg(test)] +#[cfg(any(test, feature = "unstable"))] macro_rules! expect_payment_received { ($node: expr, $expected_payment_hash: expr, $expected_recv_value: expr) => { let events = $node.node.get_and_clear_pending_events(); @@@ -1243,6 -1276,7 +1295,7 @@@ pub fn create_network<'a, 'b: 'a, 'c: ' let mut nodes = Vec::new(); let chan_count = Rc::new(RefCell::new(0)); let payment_count = Rc::new(RefCell::new(0)); + let connect_style = Rc::new(RefCell::new(ConnectStyle::FullBlockViaListen)); for i in 0..node_count { let net_graph_msg_handler = NetGraphMsgHandler::new(cfgs[i].chain_source.genesis_hash, None, cfgs[i].logger); @@@ -1251,7 -1285,8 +1304,8 @@@ keys_manager: &cfgs[i].keys_manager, node: &chan_mgrs[i], net_graph_msg_handler, node_seed: cfgs[i].node_seed, network_chan_count: chan_count.clone(), network_payment_count: payment_count.clone(), logger: cfgs[i].logger, - blocks: RefCell::new(vec![(genesis_block(Network::Testnet).header, 0)]) + blocks: RefCell::new(vec![(genesis_block(Network::Testnet).header, 0)]), + connect_style: Rc::clone(&connect_style), }) } @@@ -1364,22 -1399,36 +1418,36 @@@ pub fn check_preimage_claim<'a, 'b, 'c> pub fn get_announce_close_broadcast_events<'a, 'b, 'c>(nodes: &Vec>, a: usize, b: usize) { let events_1 = nodes[a].node.get_and_clear_pending_msg_events(); - assert_eq!(events_1.len(), 1); + assert_eq!(events_1.len(), 2); let as_update = match events_1[0] { MessageSendEvent::BroadcastChannelUpdate { ref msg } => { msg.clone() }, _ => panic!("Unexpected event"), }; + match events_1[1] { + MessageSendEvent::HandleError { node_id, action: msgs::ErrorAction::SendErrorMessage { ref msg } } => { + assert_eq!(node_id, nodes[b].node.get_our_node_id()); + assert_eq!(msg.data, "Commitment or closing transaction was confirmed on chain."); + }, + _ => panic!("Unexpected event"), + } let events_2 = nodes[b].node.get_and_clear_pending_msg_events(); - assert_eq!(events_2.len(), 1); + assert_eq!(events_2.len(), 2); let bs_update = match events_2[0] { MessageSendEvent::BroadcastChannelUpdate { ref msg } => { msg.clone() }, _ => panic!("Unexpected event"), }; + match events_2[1] { + MessageSendEvent::HandleError { node_id, action: msgs::ErrorAction::SendErrorMessage { ref msg } } => { + assert_eq!(node_id, nodes[a].node.get_our_node_id()); + assert_eq!(msg.data, "Commitment or closing transaction was confirmed on chain."); + }, + _ => panic!("Unexpected event"), + } for node in nodes { node.net_graph_msg_handler.handle_channel_update(&as_update).unwrap();