}
macro_rules! handle_error {
- ($self: ident, $internal: expr, $their_node_id: expr, $locked_channel_state: expr) => {
+ ($self: ident, $internal: expr, $their_node_id: expr) => {
match $internal {
Ok(msg) => Ok(msg),
Err(MsgHandleErrInternal { err, shutdown_finish }) => {
+ #[cfg(debug_assertions)]
+ {
+ // In testing, ensure there are no deadlocks where the lock is already held upon
+ // entering the macro.
+ assert!($self.channel_state.try_lock().is_ok());
+ }
+
+ let mut msg_events = Vec::with_capacity(2);
+
if let Some((shutdown_res, update_option)) = shutdown_finish {
$self.finish_force_close_channel(shutdown_res);
if let Some(update) = update_option {
- $locked_channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
+ msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
msg: update
});
}
}
+
log_error!($self, "{}", err.err);
if let msgs::ErrorAction::IgnoreError = err.action {
- } else { $locked_channel_state.pending_msg_events.push(events::MessageSendEvent::HandleError { node_id: $their_node_id, action: err.action.clone() }); }
+ } else {
+ msg_events.push(events::MessageSendEvent::HandleError {
+ node_id: $their_node_id,
+ action: err.action.clone()
+ });
+ }
+
+ if !msg_events.is_empty() {
+ $self.channel_state.lock().unwrap().pending_msg_events.append(&mut msg_events);
+ }
+
// Return error in case higher-API need one
Err(err)
},
let _ = self.total_consistency_lock.read().unwrap();
- let mut channel_lock = self.channel_state.lock().unwrap();
let err: Result<(), _> = loop {
-
+ let mut channel_lock = self.channel_state.lock().unwrap();
let id = match channel_lock.short_to_id.get(&route.hops.first().unwrap().short_channel_id) {
None => return Err(APIError::ChannelUnavailable{err: "No channel available with first hop!"}),
Some(id) => id.clone(),
return Ok(());
};
- match handle_error!(self, err, route.hops.first().unwrap().pubkey, channel_lock) {
+ match handle_error!(self, err, route.hops.first().unwrap().pubkey) {
Ok(_) => unreachable!(),
Err(e) => { Err(APIError::ChannelUnavailable { err: e.err }) }
}
let _ = self.total_consistency_lock.read().unwrap();
let (mut chan, msg, chan_monitor) = {
- let mut channel_state = self.channel_state.lock().unwrap();
- let (res, chan) = match channel_state.by_id.remove(temporary_channel_id) {
+ let (res, chan) = match self.channel_state.lock().unwrap().by_id.remove(temporary_channel_id) {
Some(mut chan) => {
(chan.get_outbound_funding_created(funding_txo)
.map_err(|e| if let ChannelError::Close(msg) = e {
},
None => return
};
- match handle_error!(self, res, chan.get_their_node_id(), channel_state) {
+ match handle_error!(self, res, chan.get_their_node_id()) {
Ok(funding_msg) => {
(chan, funding_msg.0, funding_msg.1)
},
if let Err(e) = self.monitor.add_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
match e {
ChannelMonitorUpdateErr::PermanentFailure => {
- {
- let mut channel_state = self.channel_state.lock().unwrap();
- match handle_error!(self, Err(MsgHandleErrInternal::from_finish_shutdown("ChannelMonitor storage failure", *temporary_channel_id, chan.force_shutdown(true), None)), chan.get_their_node_id(), channel_state) {
- Err(_) => { return; },
- Ok(()) => unreachable!(),
- }
+ match handle_error!(self, Err(MsgHandleErrInternal::from_finish_shutdown("ChannelMonitor storage failure", *temporary_channel_id, chan.force_shutdown(true), None)), chan.get_their_node_id()) {
+ Err(_) => { return; },
+ Ok(()) => unreachable!(),
}
},
ChannelMonitorUpdateErr::TemporaryFailure => {
},
ChannelError::CloseDelayBroadcast { .. } => { panic!("Wait is only generated on receipt of channel_reestablish, which is handled by try_chan_entry, we don't bother to support it here"); }
};
- match handle_error!(self, err, their_node_id, channel_state) {
- Ok(_) => unreachable!(),
- Err(_) => { continue; },
- }
+ handle_errors.push((their_node_id, err));
+ continue;
}
};
if let Err(e) = self.monitor.update_monitor(chan.get().get_funding_txo().unwrap(), monitor_update) {
};
}
- if handle_errors.len() > 0 {
- let mut channel_state_lock = self.channel_state.lock().unwrap();
- for (their_node_id, err) in handle_errors.drain(..) {
- let _ = handle_error!(self, err, their_node_id, channel_state_lock);
- }
+ for (their_node_id, err) in handle_errors.drain(..) {
+ let _ = handle_error!(self, err, their_node_id);
}
if new_events.is_empty() { return }
return;
};
- let _ = handle_error!(self, err, their_node_id, channel_state_lock);
+ mem::drop(channel_state_lock);
+ let _ = handle_error!(self, err, their_node_id);
}
/// Gets the node_id held by this ChannelManager
#[doc(hidden)]
pub fn update_fee(&self, channel_id: [u8;32], feerate_per_kw: u64) -> Result<(), APIError> {
let _ = self.total_consistency_lock.read().unwrap();
- let mut channel_state_lock = self.channel_state.lock().unwrap();
let their_node_id;
let err: Result<(), _> = loop {
+ let mut channel_state_lock = self.channel_state.lock().unwrap();
let channel_state = &mut *channel_state_lock;
match channel_state.by_id.entry(channel_id) {
return Ok(())
};
- match handle_error!(self, err, their_node_id, channel_state_lock) {
+ match handle_error!(self, err, their_node_id) {
Ok(_) => unreachable!(),
Err(e) => { Err(APIError::APIMisuseError { err: e.err })}
}
{
fn handle_open_channel(&self, their_node_id: &PublicKey, their_features: InitFeatures, msg: &msgs::OpenChannel) {
let _ = self.total_consistency_lock.read().unwrap();
- let res = self.internal_open_channel(their_node_id, their_features, msg);
- if res.is_err() {
- let mut channel_state_lock = self.channel_state.lock().unwrap();
- let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
- }
+ let _ = handle_error!(self, self.internal_open_channel(their_node_id, their_features, msg), *their_node_id);
}
fn handle_accept_channel(&self, their_node_id: &PublicKey, their_features: InitFeatures, msg: &msgs::AcceptChannel) {
let _ = self.total_consistency_lock.read().unwrap();
- let res = self.internal_accept_channel(their_node_id, their_features, msg);
- if res.is_err() {
- let mut channel_state_lock = self.channel_state.lock().unwrap();
- let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
- }
+ let _ = handle_error!(self, self.internal_accept_channel(their_node_id, their_features, msg), *their_node_id);
}
fn handle_funding_created(&self, their_node_id: &PublicKey, msg: &msgs::FundingCreated) {
let _ = self.total_consistency_lock.read().unwrap();
- let res = self.internal_funding_created(their_node_id, msg);
- if res.is_err() {
- let mut channel_state_lock = self.channel_state.lock().unwrap();
- let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
- }
+ let _ = handle_error!(self, self.internal_funding_created(their_node_id, msg), *their_node_id);
}
fn handle_funding_signed(&self, their_node_id: &PublicKey, msg: &msgs::FundingSigned) {
let _ = self.total_consistency_lock.read().unwrap();
- let res = self.internal_funding_signed(their_node_id, msg);
- if res.is_err() {
- let mut channel_state_lock = self.channel_state.lock().unwrap();
- let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
- }
+ let _ = handle_error!(self, self.internal_funding_signed(their_node_id, msg), *their_node_id);
}
fn handle_funding_locked(&self, their_node_id: &PublicKey, msg: &msgs::FundingLocked) {
let _ = self.total_consistency_lock.read().unwrap();
- let res = self.internal_funding_locked(their_node_id, msg);
- if res.is_err() {
- let mut channel_state_lock = self.channel_state.lock().unwrap();
- let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
- }
+ let _ = handle_error!(self, self.internal_funding_locked(their_node_id, msg), *their_node_id);
}
fn handle_shutdown(&self, their_node_id: &PublicKey, msg: &msgs::Shutdown) {
let _ = self.total_consistency_lock.read().unwrap();
- let res = self.internal_shutdown(their_node_id, msg);
- if res.is_err() {
- let mut channel_state_lock = self.channel_state.lock().unwrap();
- let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
- }
+ let _ = handle_error!(self, self.internal_shutdown(their_node_id, msg), *their_node_id);
}
fn handle_closing_signed(&self, their_node_id: &PublicKey, msg: &msgs::ClosingSigned) {
let _ = self.total_consistency_lock.read().unwrap();
- let res = self.internal_closing_signed(their_node_id, msg);
- if res.is_err() {
- let mut channel_state_lock = self.channel_state.lock().unwrap();
- let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
- }
+ let _ = handle_error!(self, self.internal_closing_signed(their_node_id, msg), *their_node_id);
}
fn handle_update_add_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateAddHTLC) {
let _ = self.total_consistency_lock.read().unwrap();
- let res = self.internal_update_add_htlc(their_node_id, msg);
- if res.is_err() {
- let mut channel_state_lock = self.channel_state.lock().unwrap();
- let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
- }
+ let _ = handle_error!(self, self.internal_update_add_htlc(their_node_id, msg), *their_node_id);
}
fn handle_update_fulfill_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFulfillHTLC) {
let _ = self.total_consistency_lock.read().unwrap();
- let res = self.internal_update_fulfill_htlc(their_node_id, msg);
- if res.is_err() {
- let mut channel_state_lock = self.channel_state.lock().unwrap();
- let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
- }
+ let _ = handle_error!(self, self.internal_update_fulfill_htlc(their_node_id, msg), *their_node_id);
}
fn handle_update_fail_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFailHTLC) {
let _ = self.total_consistency_lock.read().unwrap();
- let res = self.internal_update_fail_htlc(their_node_id, msg);
- if res.is_err() {
- let mut channel_state_lock = self.channel_state.lock().unwrap();
- let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
- }
+ let _ = handle_error!(self, self.internal_update_fail_htlc(their_node_id, msg), *their_node_id);
}
fn handle_update_fail_malformed_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFailMalformedHTLC) {
let _ = self.total_consistency_lock.read().unwrap();
- let res = self.internal_update_fail_malformed_htlc(their_node_id, msg);
- if res.is_err() {
- let mut channel_state_lock = self.channel_state.lock().unwrap();
- let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
- }
+ let _ = handle_error!(self, self.internal_update_fail_malformed_htlc(their_node_id, msg), *their_node_id);
}
fn handle_commitment_signed(&self, their_node_id: &PublicKey, msg: &msgs::CommitmentSigned) {
let _ = self.total_consistency_lock.read().unwrap();
- let res = self.internal_commitment_signed(their_node_id, msg);
- if res.is_err() {
- let mut channel_state_lock = self.channel_state.lock().unwrap();
- let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
- }
+ let _ = handle_error!(self, self.internal_commitment_signed(their_node_id, msg), *their_node_id);
}
fn handle_revoke_and_ack(&self, their_node_id: &PublicKey, msg: &msgs::RevokeAndACK) {
let _ = self.total_consistency_lock.read().unwrap();
- let res = self.internal_revoke_and_ack(their_node_id, msg);
- if res.is_err() {
- let mut channel_state_lock = self.channel_state.lock().unwrap();
- let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
- }
+ let _ = handle_error!(self, self.internal_revoke_and_ack(their_node_id, msg), *their_node_id);
}
fn handle_update_fee(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFee) {
let _ = self.total_consistency_lock.read().unwrap();
- let res = self.internal_update_fee(their_node_id, msg);
- if res.is_err() {
- let mut channel_state_lock = self.channel_state.lock().unwrap();
- let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
- }
+ let _ = handle_error!(self, self.internal_update_fee(their_node_id, msg), *their_node_id);
}
fn handle_announcement_signatures(&self, their_node_id: &PublicKey, msg: &msgs::AnnouncementSignatures) {
let _ = self.total_consistency_lock.read().unwrap();
- let res = self.internal_announcement_signatures(their_node_id, msg);
- if res.is_err() {
- let mut channel_state_lock = self.channel_state.lock().unwrap();
- let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
- }
+ let _ = handle_error!(self, self.internal_announcement_signatures(their_node_id, msg), *their_node_id);
}
fn handle_channel_reestablish(&self, their_node_id: &PublicKey, msg: &msgs::ChannelReestablish) {
let _ = self.total_consistency_lock.read().unwrap();
- let res = self.internal_channel_reestablish(their_node_id, msg);
- if res.is_err() {
- let mut channel_state_lock = self.channel_state.lock().unwrap();
- let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
- }
+ let _ = handle_error!(self, self.internal_channel_reestablish(their_node_id, msg), *their_node_id);
}
fn peer_disconnected(&self, their_node_id: &PublicKey, no_connection_possible: bool) {
do_test_commitment_revoked_fail_backward_exhaustive(true, false, true);
}
+#[test]
+fn fail_backward_pending_htlc_upon_channel_failure() {
+ let chanmon_cfgs = create_chanmon_cfgs(2);
+ let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
+ let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
+ let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs);
+ let chan = create_announced_chan_between_nodes_with_value(&nodes, 0, 1, 1_000_000, 500_000_000, InitFeatures::supported(), InitFeatures::supported());
+
+ // Alice -> Bob: Route a payment but without Bob sending revoke_and_ack.
+ {
+ let (_, payment_hash) = get_payment_preimage_hash!(nodes[0]);
+ let route = nodes[0].router.get_route(&nodes[1].node.get_our_node_id(), None, &Vec::new(), 50_000, TEST_FINAL_CLTV).unwrap();
+ nodes[0].node.send_payment(route, payment_hash).unwrap();
+ check_added_monitors!(nodes[0], 1);
+
+ let payment_event = {
+ let mut events = nodes[0].node.get_and_clear_pending_msg_events();
+ assert_eq!(events.len(), 1);
+ SendEvent::from_event(events.remove(0))
+ };
+ assert_eq!(payment_event.node_id, nodes[1].node.get_our_node_id());
+ assert_eq!(payment_event.msgs.len(), 1);
+ }
+
+ // Alice -> Bob: Route another payment but now Alice waits for Bob's earlier revoke_and_ack.
+ let (_, failed_payment_hash) = get_payment_preimage_hash!(nodes[0]);
+ {
+ let route = nodes[0].router.get_route(&nodes[1].node.get_our_node_id(), None, &Vec::new(), 50_000, TEST_FINAL_CLTV).unwrap();
+ nodes[0].node.send_payment(route, failed_payment_hash).unwrap();
+ check_added_monitors!(nodes[0], 0);
+
+ assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty());
+ }
+
+ // Alice <- Bob: Send a malformed update_add_htlc so Alice fails the channel.
+ {
+ let route = nodes[1].router.get_route(&nodes[0].node.get_our_node_id(), None, &Vec::new(), 50_000, TEST_FINAL_CLTV).unwrap();
+ let (_, payment_hash) = get_payment_preimage_hash!(nodes[1]);
+
+ let secp_ctx = Secp256k1::new();
+ let session_priv = {
+ let mut session_key = [0; 32];
+ let mut rng = thread_rng();
+ rng.fill_bytes(&mut session_key);
+ SecretKey::from_slice(&session_key).expect("RNG is bad!")
+ };
+
+ let current_height = nodes[1].node.latest_block_height.load(Ordering::Acquire) as u32 + 1;
+ let (onion_payloads, _amount_msat, cltv_expiry) = onion_utils::build_onion_payloads(&route, current_height).unwrap();
+ let onion_keys = onion_utils::construct_onion_keys(&secp_ctx, &route, &session_priv).unwrap();
+ let onion_routing_packet = onion_utils::construct_onion_packet(onion_payloads, onion_keys, [0; 32], &payment_hash);
+
+ // Send a 0-msat update_add_htlc to fail the channel.
+ let update_add_htlc = msgs::UpdateAddHTLC {
+ channel_id: chan.2,
+ htlc_id: 0,
+ amount_msat: 0,
+ payment_hash,
+ cltv_expiry,
+ onion_routing_packet,
+ };
+ nodes[0].node.handle_update_add_htlc(&nodes[1].node.get_our_node_id(), &update_add_htlc);
+ }
+
+ // Check that Alice fails backward the pending HTLC from the second payment.
+ expect_payment_failed!(nodes[0], failed_payment_hash, true);
+ check_closed_broadcast!(nodes[0], true);
+ check_added_monitors!(nodes[0], 1);
+}
+
#[test]
fn test_htlc_ignore_latest_remote_commitment() {
// Test that HTLC transactions spending the latest remote commitment transaction are simply