From: Matt Corallo <649246+TheBlueMatt@users.noreply.github.com> Date: Thu, 2 Apr 2020 20:06:00 +0000 (+0000) Subject: Merge pull request #568 from jkczyz/2020-03-handle-error-deadlock X-Git-Tag: v0.0.12~91 X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=commitdiff_plain;h=f0b037ce146d147d2dcfbb9610d2fa84ef8b539a;hp=-c;p=rust-lightning Merge pull request #568 from jkczyz/2020-03-handle-error-deadlock Fix deadlock in ChannelManager's handle_error!() --- f0b037ce146d147d2dcfbb9610d2fa84ef8b539a diff --combined lightning/src/ln/channelmanager.rs index 1afce1f7,e9dfe5a5..a471ca3f --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@@ -467,21 -467,41 +467,41 @@@ pub struct ChannelDetails } macro_rules! handle_error { - ($self: ident, $internal: expr, $their_node_id: expr, $locked_channel_state: expr) => { + ($self: ident, $internal: expr, $their_node_id: expr) => { match $internal { Ok(msg) => Ok(msg), Err(MsgHandleErrInternal { err, shutdown_finish }) => { + #[cfg(debug_assertions)] + { + // In testing, ensure there are no deadlocks where the lock is already held upon + // entering the macro. + assert!($self.channel_state.try_lock().is_ok()); + } + + let mut msg_events = Vec::with_capacity(2); + if let Some((shutdown_res, update_option)) = shutdown_finish { $self.finish_force_close_channel(shutdown_res); if let Some(update) = update_option { - $locked_channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { + msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { msg: update }); } } + log_error!($self, "{}", err.err); if let msgs::ErrorAction::IgnoreError = err.action { - } else { $locked_channel_state.pending_msg_events.push(events::MessageSendEvent::HandleError { node_id: $their_node_id, action: err.action.clone() }); } + } else { + msg_events.push(events::MessageSendEvent::HandleError { + node_id: $their_node_id, + action: err.action.clone() + }); + } + + if !msg_events.is_empty() { + $self.channel_state.lock().unwrap().pending_msg_events.append(&mut msg_events); + } + // Return error in case higher-API need one Err(err) }, @@@ -1191,9 -1211,8 +1211,8 @@@ impl = loop { - + let mut channel_lock = self.channel_state.lock().unwrap(); let id = match channel_lock.short_to_id.get(&route.hops.first().unwrap().short_channel_id) { None => return Err(APIError::ChannelUnavailable{err: "No channel available with first hop!"}), Some(id) => id.clone(), @@@ -1242,7 -1261,7 +1261,7 @@@ return Ok(()); }; - match handle_error!(self, err, route.hops.first().unwrap().pubkey, channel_lock) { + match handle_error!(self, err, route.hops.first().unwrap().pubkey) { Ok(_) => unreachable!(), Err(e) => { Err(APIError::ChannelUnavailable { err: e.err }) } } @@@ -1261,8 -1280,7 +1280,7 @@@ let _ = self.total_consistency_lock.read().unwrap(); let (mut chan, msg, chan_monitor) = { - let mut channel_state = self.channel_state.lock().unwrap(); - let (res, chan) = match channel_state.by_id.remove(temporary_channel_id) { + let (res, chan) = match self.channel_state.lock().unwrap().by_id.remove(temporary_channel_id) { Some(mut chan) => { (chan.get_outbound_funding_created(funding_txo) .map_err(|e| if let ChannelError::Close(msg) = e { @@@ -1272,7 -1290,7 +1290,7 @@@ }, None => return }; - match handle_error!(self, res, chan.get_their_node_id(), channel_state) { + match handle_error!(self, res, chan.get_their_node_id()) { Ok(funding_msg) => { (chan, funding_msg.0, funding_msg.1) }, @@@ -1284,12 -1302,9 +1302,9 @@@ if let Err(e) = self.monitor.add_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) { match e { ChannelMonitorUpdateErr::PermanentFailure => { - { - let mut channel_state = self.channel_state.lock().unwrap(); - match handle_error!(self, Err(MsgHandleErrInternal::from_finish_shutdown("ChannelMonitor storage failure", *temporary_channel_id, chan.force_shutdown(true), None)), chan.get_their_node_id(), channel_state) { - Err(_) => { return; }, - Ok(()) => unreachable!(), - } + match handle_error!(self, Err(MsgHandleErrInternal::from_finish_shutdown("ChannelMonitor storage failure", *temporary_channel_id, chan.force_shutdown(true), None)), chan.get_their_node_id()) { + Err(_) => { return; }, + Ok(()) => unreachable!(), } }, ChannelMonitorUpdateErr::TemporaryFailure => { @@@ -1520,10 -1535,8 +1535,8 @@@ }, ChannelError::CloseDelayBroadcast { .. } => { panic!("Wait is only generated on receipt of channel_reestablish, which is handled by try_chan_entry, we don't bother to support it here"); } }; - match handle_error!(self, err, their_node_id, channel_state) { - Ok(_) => unreachable!(), - Err(_) => { continue; }, - } + handle_errors.push((their_node_id, err)); + continue; } }; if let Err(e) = self.monitor.update_monitor(chan.get().get_funding_txo().unwrap(), monitor_update) { @@@ -1579,11 -1592,8 +1592,8 @@@ }; } - if handle_errors.len() > 0 { - let mut channel_state_lock = self.channel_state.lock().unwrap(); - for (their_node_id, err) in handle_errors.drain(..) { - let _ = handle_error!(self, err, their_node_id, channel_state_lock); - } + for (their_node_id, err) in handle_errors.drain(..) { + let _ = handle_error!(self, err, their_node_id); } if new_events.is_empty() { return } @@@ -1835,7 -1845,8 +1845,8 @@@ return; }; - let _ = handle_error!(self, err, their_node_id, channel_state_lock); + mem::drop(channel_state_lock); + let _ = handle_error!(self, err, their_node_id); } /// Gets the node_id held by this ChannelManager @@@ -2579,9 -2590,9 +2590,9 @@@ #[doc(hidden)] pub fn update_fee(&self, channel_id: [u8;32], feerate_per_kw: u64) -> Result<(), APIError> { let _ = self.total_consistency_lock.read().unwrap(); - let mut channel_state_lock = self.channel_state.lock().unwrap(); let their_node_id; let err: Result<(), _> = loop { + let mut channel_state_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_state_lock; match channel_state.by_id.entry(channel_id) { @@@ -2620,7 -2631,7 +2631,7 @@@ return Ok(()) }; - match handle_error!(self, err, their_node_id, channel_state_lock) { + match handle_error!(self, err, their_node_id) { Ok(_) => unreachable!(), Err(e) => { Err(APIError::APIMisuseError { err: e.err })} } @@@ -2830,146 -2841,82 +2841,82 @@@ impl monitor.get_latest_update_id() { + // If the channel is ahead of the monitor, return InvalidValue: + return Err(DecodeError::InvalidValue); + } else if channel.get_cur_local_commitment_transaction_number() > monitor.get_cur_local_commitment_number() || + channel.get_revoked_remote_commitment_transaction_number() > monitor.get_min_seen_secret() || + channel.get_cur_remote_commitment_transaction_number() > monitor.get_cur_remote_commitment_number() || + channel.get_latest_monitor_update_id() < monitor.get_latest_update_id() { + // But if the channel is behind of the monitor, close the channel: let (_, _, mut new_failed_htlcs) = channel.force_shutdown(true); failed_htlcs.append(&mut new_failed_htlcs); monitor.broadcast_latest_local_commitment_txn(&args.tx_broadcaster); diff --combined lightning/src/ln/functional_tests.rs index 8ea9dcf7,76749c3b..eb1194c0 --- a/lightning/src/ln/functional_tests.rs +++ b/lightning/src/ln/functional_tests.rs @@@ -2988,6 -2988,76 +2988,76 @@@ fn test_commitment_revoked_fail_backwar do_test_commitment_revoked_fail_backward_exhaustive(true, false, true); } + #[test] + fn fail_backward_pending_htlc_upon_channel_failure() { + let chanmon_cfgs = create_chanmon_cfgs(2); + let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); + let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); + let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs); + let chan = create_announced_chan_between_nodes_with_value(&nodes, 0, 1, 1_000_000, 500_000_000, InitFeatures::supported(), InitFeatures::supported()); + + // Alice -> Bob: Route a payment but without Bob sending revoke_and_ack. + { + let (_, payment_hash) = get_payment_preimage_hash!(nodes[0]); + let route = nodes[0].router.get_route(&nodes[1].node.get_our_node_id(), None, &Vec::new(), 50_000, TEST_FINAL_CLTV).unwrap(); + nodes[0].node.send_payment(route, payment_hash).unwrap(); + check_added_monitors!(nodes[0], 1); + + let payment_event = { + let mut events = nodes[0].node.get_and_clear_pending_msg_events(); + assert_eq!(events.len(), 1); + SendEvent::from_event(events.remove(0)) + }; + assert_eq!(payment_event.node_id, nodes[1].node.get_our_node_id()); + assert_eq!(payment_event.msgs.len(), 1); + } + + // Alice -> Bob: Route another payment but now Alice waits for Bob's earlier revoke_and_ack. + let (_, failed_payment_hash) = get_payment_preimage_hash!(nodes[0]); + { + let route = nodes[0].router.get_route(&nodes[1].node.get_our_node_id(), None, &Vec::new(), 50_000, TEST_FINAL_CLTV).unwrap(); + nodes[0].node.send_payment(route, failed_payment_hash).unwrap(); + check_added_monitors!(nodes[0], 0); + + assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty()); + } + + // Alice <- Bob: Send a malformed update_add_htlc so Alice fails the channel. + { + let route = nodes[1].router.get_route(&nodes[0].node.get_our_node_id(), None, &Vec::new(), 50_000, TEST_FINAL_CLTV).unwrap(); + let (_, payment_hash) = get_payment_preimage_hash!(nodes[1]); + + let secp_ctx = Secp256k1::new(); + let session_priv = { + let mut session_key = [0; 32]; + let mut rng = thread_rng(); + rng.fill_bytes(&mut session_key); + SecretKey::from_slice(&session_key).expect("RNG is bad!") + }; + + let current_height = nodes[1].node.latest_block_height.load(Ordering::Acquire) as u32 + 1; + let (onion_payloads, _amount_msat, cltv_expiry) = onion_utils::build_onion_payloads(&route, current_height).unwrap(); + let onion_keys = onion_utils::construct_onion_keys(&secp_ctx, &route, &session_priv).unwrap(); + let onion_routing_packet = onion_utils::construct_onion_packet(onion_payloads, onion_keys, [0; 32], &payment_hash); + + // Send a 0-msat update_add_htlc to fail the channel. + let update_add_htlc = msgs::UpdateAddHTLC { + channel_id: chan.2, + htlc_id: 0, + amount_msat: 0, + payment_hash, + cltv_expiry, + onion_routing_packet, + }; + nodes[0].node.handle_update_add_htlc(&nodes[1].node.get_our_node_id(), &update_add_htlc); + } + + // Check that Alice fails backward the pending HTLC from the second payment. + expect_payment_failed!(nodes[0], failed_payment_hash, true); + check_closed_broadcast!(nodes[0], true); + check_added_monitors!(nodes[0], 1); + } + #[test] fn test_htlc_ignore_latest_remote_commitment() { // Test that HTLC transactions spending the latest remote commitment transaction are simply @@@ -3839,13 -3909,6 +3909,13 @@@ fn test_manager_serialize_deserialize_i create_announced_chan_between_nodes(&nodes, 2, 0, InitFeatures::supported(), InitFeatures::supported()); let (_, _, channel_id, funding_tx) = create_announced_chan_between_nodes(&nodes, 0, 3, InitFeatures::supported(), InitFeatures::supported()); + let mut node_0_stale_monitors_serialized = Vec::new(); + for monitor in nodes[0].chan_monitor.simple_monitor.monitors.lock().unwrap().iter() { + let mut writer = test_utils::TestVecWriter(Vec::new()); + monitor.1.write_for_disk(&mut writer).unwrap(); + node_0_stale_monitors_serialized.push(writer.0); + } + let (our_payment_preimage, _) = route_payment(&nodes[2], &[&nodes[0], &nodes[1]], 1000000); // Serialize the ChannelManager here, but the monitor we keep up-to-date @@@ -3868,15 -3931,6 +3938,15 @@@ fee_estimator = test_utils::TestFeeEstimator { sat_per_kw: 253 }; new_chan_monitor = test_utils::TestChannelMonitor::new(nodes[0].chain_monitor.clone(), nodes[0].tx_broadcaster.clone(), Arc::new(test_utils::TestLogger::new()), &fee_estimator); nodes[0].chan_monitor = &new_chan_monitor; + + let mut node_0_stale_monitors = Vec::new(); + for serialized in node_0_stale_monitors_serialized.iter() { + let mut read = &serialized[..]; + let (_, monitor) = <(Sha256dHash, ChannelMonitor)>::read(&mut read, Arc::new(test_utils::TestLogger::new())).unwrap(); + assert!(read.is_empty()); + node_0_stale_monitors.push(monitor); + } + let mut node_0_monitors = Vec::new(); for serialized in node_0_monitors_serialized.iter() { let mut read = &serialized[..]; @@@ -3885,25 -3939,9 +3955,25 @@@ node_0_monitors.push(monitor); } - let mut nodes_0_read = &nodes_0_serialized[..]; keys_manager = test_utils::TestKeysInterface::new(&nodes[0].node_seed, Network::Testnet, Arc::new(test_utils::TestLogger::new())); - let (_, nodes_0_deserialized_tmp) = <(Sha256dHash, ChannelManager)>::read(&mut nodes_0_read, ChannelManagerReadArgs { + + let mut nodes_0_read = &nodes_0_serialized[..]; + if let Err(msgs::DecodeError::InvalidValue) = + <(Sha256dHash, ChannelManager)>::read(&mut nodes_0_read, ChannelManagerReadArgs { + default_config: UserConfig::default(), + keys_manager: &keys_manager, + fee_estimator: &fee_estimator, + monitor: nodes[0].chan_monitor, + tx_broadcaster: nodes[0].tx_broadcaster.clone(), + logger: Arc::new(test_utils::TestLogger::new()), + channel_monitors: &mut node_0_stale_monitors.iter_mut().map(|monitor| { (monitor.get_funding_txo().unwrap(), monitor) }).collect(), + }) { } else { + panic!("If the monitor(s) are stale, this indicates a bug and we should get an Err return"); + }; + + let mut nodes_0_read = &nodes_0_serialized[..]; + let (_, nodes_0_deserialized_tmp) = + <(Sha256dHash, ChannelManager)>::read(&mut nodes_0_read, ChannelManagerReadArgs { default_config: UserConfig::default(), keys_manager: &keys_manager, fee_estimator: &fee_estimator,