where
FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs>
{
- let mut dependent_txdata = Vec::new();
- {
- let monitor_states = self.monitors.write().unwrap();
- if let Some(height) = best_height {
- // If the best block height is being updated, update highest_chain_height under the
- // monitors write lock.
- let old_height = self.highest_chain_height.load(Ordering::Acquire);
- let new_height = height as usize;
- if new_height > old_height {
- self.highest_chain_height.store(new_height, Ordering::Release);
- }
+ let monitor_states = self.monitors.write().unwrap();
+ if let Some(height) = best_height {
+ // If the best block height is being updated, update highest_chain_height under the
+ // monitors write lock.
+ let old_height = self.highest_chain_height.load(Ordering::Acquire);
+ let new_height = height as usize;
+ if new_height > old_height {
+ self.highest_chain_height.store(new_height, Ordering::Release);
}
+ }
- for (funding_outpoint, monitor_state) in monitor_states.iter() {
- let monitor = &monitor_state.monitor;
- let mut txn_outputs;
- {
- txn_outputs = process(monitor, txdata);
- let update_id = MonitorUpdateId {
- contents: UpdateOrigin::ChainSync(self.sync_persistence_id.get_increment()),
- };
- let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap();
- if let Some(height) = best_height {
- if !monitor_state.has_pending_chainsync_updates(&pending_monitor_updates) {
- // If there are not ChainSync persists awaiting completion, go ahead and
- // set last_chain_persist_height here - we wouldn't want the first
- // TemporaryFailure to always immediately be considered "overly delayed".
- monitor_state.last_chain_persist_height.store(height as usize, Ordering::Release);
- }
+ for (funding_outpoint, monitor_state) in monitor_states.iter() {
+ let monitor = &monitor_state.monitor;
+ let mut txn_outputs;
+ {
+ txn_outputs = process(monitor, txdata);
+ let update_id = MonitorUpdateId {
+ contents: UpdateOrigin::ChainSync(self.sync_persistence_id.get_increment()),
+ };
+ let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap();
+ if let Some(height) = best_height {
+ if !monitor_state.has_pending_chainsync_updates(&pending_monitor_updates) {
+ // If there are not ChainSync persists awaiting completion, go ahead and
+ // set last_chain_persist_height here - we wouldn't want the first
+ // TemporaryFailure to always immediately be considered "overly delayed".
+ monitor_state.last_chain_persist_height.store(height as usize, Ordering::Release);
}
+ }
- log_trace!(self.logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor));
- match self.persister.update_persisted_channel(*funding_outpoint, &None, monitor, update_id) {
- Ok(()) =>
- log_trace!(self.logger, "Finished syncing Channel Monitor for channel {}", log_funding_info!(monitor)),
- Err(ChannelMonitorUpdateErr::PermanentFailure) => {
- monitor_state.channel_perm_failed.store(true, Ordering::Release);
- self.pending_monitor_events.lock().unwrap().push((*funding_outpoint, vec![MonitorEvent::UpdateFailed(*funding_outpoint)], monitor.get_counterparty_node_id()));
- },
- Err(ChannelMonitorUpdateErr::TemporaryFailure) => {
- log_debug!(self.logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor));
- pending_monitor_updates.push(update_id);
- },
- }
+ log_trace!(self.logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor));
+ match self.persister.update_persisted_channel(*funding_outpoint, &None, monitor, update_id) {
+ Ok(()) =>
+ log_trace!(self.logger, "Finished syncing Channel Monitor for channel {}", log_funding_info!(monitor)),
+ Err(ChannelMonitorUpdateErr::PermanentFailure) => {
+ monitor_state.channel_perm_failed.store(true, Ordering::Release);
+ self.pending_monitor_events.lock().unwrap().push((*funding_outpoint, vec![MonitorEvent::UpdateFailed(*funding_outpoint)], monitor.get_counterparty_node_id()));
+ },
+ Err(ChannelMonitorUpdateErr::TemporaryFailure) => {
+ log_debug!(self.logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor));
+ pending_monitor_updates.push(update_id);
+ },
}
+ }
- // Register any new outputs with the chain source for filtering, storing any dependent
- // transactions from within the block that previously had not been included in txdata.
- if let Some(ref chain_source) = self.chain_source {
- let block_hash = header.block_hash();
- for (txid, mut outputs) in txn_outputs.drain(..) {
- for (idx, output) in outputs.drain(..) {
- // Register any new outputs with the chain source for filtering and recurse
- // if it indicates that there are dependent transactions within the block
- // that had not been previously included in txdata.
- let output = WatchedOutput {
- block_hash: Some(block_hash),
- outpoint: OutPoint { txid, index: idx as u16 },
- script_pubkey: output.script_pubkey,
- };
- if let Some(tx) = chain_source.register_output(output) {
- dependent_txdata.push(tx);
- }
- }
+ // Register any new outputs with the chain source for filtering, storing any dependent
+ // transactions from within the block that previously had not been included in txdata.
+ if let Some(ref chain_source) = self.chain_source {
+ let block_hash = header.block_hash();
+ for (txid, mut outputs) in txn_outputs.drain(..) {
+ for (idx, output) in outputs.drain(..) {
+ // Register any new outputs with the chain source for filtering
+ let output = WatchedOutput {
+ block_hash: Some(block_hash),
+ outpoint: OutPoint { txid, index: idx as u16 },
+ script_pubkey: output.script_pubkey,
+ };
+ chain_source.register_output(output)
}
}
}
}
-
- // Recursively call for any dependent transactions that were identified by the chain source.
- if !dependent_txdata.is_empty() {
- dependent_txdata.sort_unstable_by_key(|(index, _tx)| *index);
- dependent_txdata.dedup_by_key(|(index, _tx)| *index);
- let txdata: Vec<_> = dependent_txdata.iter().map(|(index, tx)| (*index, tx)).collect();
- self.process_chain_data(header, None, &txdata, process); // We skip the best height the second go-around
- }
}
/// Creates a new `ChainMonitor` used to watch on-chain activity pertaining to channels.
use ::{get_htlc_update_msgs, get_local_commitment_txn, get_revoke_commit_msgs, get_route_and_payment_hash, unwrap_send_err};
use chain::{ChannelMonitorUpdateErr, Confirm, Watch};
use chain::channelmonitor::LATENCY_GRACE_PERIOD_BLOCKS;
- use ln::channelmanager::PaymentSendFailure;
- use ln::features::InitFeatures;
+ use ln::channelmanager::{self, PaymentSendFailure};
use ln::functional_test_utils::*;
use ln::msgs::ChannelMessageHandler;
use util::errors::APIError;
use util::events::{ClosureReason, MessageSendEvent, MessageSendEventsProvider};
- use util::test_utils::{OnRegisterOutput, TxOutReference};
-
- /// Tests that in-block dependent transactions are processed by `block_connected` when not
- /// included in `txdata` but returned by [`chain::Filter::register_output`]. For instance,
- /// a (non-anchor) commitment transaction's HTLC output may be spent in the same block as the
- /// commitment transaction itself. An Electrum client may filter the commitment transaction but
- /// needs to return the HTLC transaction so it can be processed.
- #[test]
- fn connect_block_checks_dependent_transactions() {
- let chanmon_cfgs = create_chanmon_cfgs(2);
- let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
- let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
- let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
- let channel = create_announced_chan_between_nodes(
- &nodes, 0, 1, InitFeatures::known(), InitFeatures::known());
-
- // Send a payment, saving nodes[0]'s revoked commitment and HTLC-Timeout transactions.
- let (commitment_tx, htlc_tx) = {
- let payment_preimage = route_payment(&nodes[0], &vec!(&nodes[1])[..], 5_000_000).0;
- let mut txn = get_local_commitment_txn!(nodes[0], channel.2);
- claim_payment(&nodes[0], &vec!(&nodes[1])[..], payment_preimage);
-
- assert_eq!(txn.len(), 2);
- (txn.remove(0), txn.remove(0))
- };
-
- // Set expectations on nodes[1]'s chain source to return dependent transactions.
- let htlc_output = TxOutReference(commitment_tx.clone(), 0);
- let to_local_output = TxOutReference(commitment_tx.clone(), 1);
- let htlc_timeout_output = TxOutReference(htlc_tx.clone(), 0);
- nodes[1].chain_source
- .expect(OnRegisterOutput { with: htlc_output, returns: Some((1, htlc_tx)) })
- .expect(OnRegisterOutput { with: to_local_output, returns: None })
- .expect(OnRegisterOutput { with: htlc_timeout_output, returns: None });
-
- // Notify nodes[1] that nodes[0]'s revoked commitment transaction was mined. The chain
- // source should return the dependent HTLC transaction when the HTLC output is registered.
- mine_transaction(&nodes[1], &commitment_tx);
-
- // Clean up so uninteresting assertions don't fail.
- check_added_monitors!(nodes[1], 1);
- nodes[1].node.get_and_clear_pending_msg_events();
- nodes[1].node.get_and_clear_pending_events();
- }
#[test]
fn test_async_ooo_offchain_updates() {
let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
- create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::known(), InitFeatures::known());
+ create_announced_chan_between_nodes(&nodes, 0, 1, channelmanager::provided_init_features(), channelmanager::provided_init_features());
// Route two payments to be claimed at the same time.
let (payment_preimage_1, payment_hash_1, _) = route_payment(&nodes[0], &[&nodes[1]], 1_000_000);
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
let channel = create_announced_chan_between_nodes(
- &nodes, 0, 1, InitFeatures::known(), InitFeatures::known());
+ &nodes, 0, 1, channelmanager::provided_init_features(), channelmanager::provided_init_features());
// Get a route for later and rebalance the channel somewhat
send_payment(&nodes[0], &[&nodes[1]], 10_000_000);
let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
- create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::known(), InitFeatures::known());
+ create_announced_chan_between_nodes(&nodes, 0, 1, channelmanager::provided_init_features(), channelmanager::provided_init_features());
chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap().clear();
chanmon_cfgs[0].persister.set_update_ret(Err(ChannelMonitorUpdateErr::PermanentFailure));