X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fln%2Fchannelmanager.rs;h=bcde81dd922b59bde534f0a1506b08924687ffb6;hb=71f98c13f8e5cab69f3084f0c8825132bc0111b7;hp=ad0d3b1897b3b2f696c7c6d281eb5806c66ef9e3;hpb=da7d53fdf2a0fb7393eea01fe5bff6efbb05dabc;p=rust-lightning diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index ad0d3b18..bcde81dd 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -76,6 +76,7 @@ enum PendingForwardReceiveHTLCInfo { }, Receive { payment_data: Option, + incoming_cltv_expiry: u32, // Used to track when we should expire pending HTLCs that go unclaimed }, } @@ -125,6 +126,7 @@ struct ClaimableHTLC { src: HTLCPreviousHopData, value: u64, payment_data: Option, + cltv_expiry: u32, } /// Tracks the inbound corresponding to an outbound HTLC @@ -491,21 +493,38 @@ pub enum PaymentSendFailure { } macro_rules! handle_error { - ($self: ident, $internal: expr, $their_node_id: expr, $locked_channel_state: expr) => { + ($self: ident, $internal: expr, $their_node_id: expr) => { match $internal { Ok(msg) => Ok(msg), Err(MsgHandleErrInternal { err, shutdown_finish }) => { + let mut channel_state = None; if let Some((shutdown_res, update_option)) = shutdown_finish { $self.finish_force_close_channel(shutdown_res); if let Some(update) = update_option { - $locked_channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { + channel_state = Some($self.channel_state.lock().unwrap()); + channel_state.as_mut().unwrap().pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { msg: update }); } } + #[cfg(debug_assertions)] + { + // In testing, we always lock here to ensure there are no deadlocks where we + // were holding the lock coming into the macro but didn't catch it because we + // didn't generate an action and didn't have any HTLCs to fail backwards in the + // finish_force_close_channel. + if channel_state.is_none() { + channel_state = Some($self.channel_state.lock().unwrap()); + } + } log_error!($self, "{}", err.err); if let msgs::ErrorAction::IgnoreError = err.action { - } else { $locked_channel_state.pending_msg_events.push(events::MessageSendEvent::HandleError { node_id: $their_node_id, action: err.action.clone() }); } + } else { + if channel_state.is_none() { + channel_state = Some($self.channel_state.lock().unwrap()); + } + channel_state.as_mut().unwrap().pending_msg_events.push(events::MessageSendEvent::HandleError { node_id: $their_node_id, action: err.action.clone() }); + } // Return error in case higher-API need one Err(err) }, @@ -673,6 +692,7 @@ impl ChannelManager where M::T let res = ChannelManager { default_configuration: config.clone(), genesis_hash: genesis_block(network).header.bitcoin_hash(), + //genesis_hash: Sha256dHash::from_hex("0f9188f13cb7b2c71f2a335e3a4fc328bf5beb436012afca590b1a11466e2206").unwrap(), fee_estimator: feeest.clone(), monitor, tx_broadcaster, @@ -1013,7 +1033,10 @@ impl ChannelManager where M::T // delay) once they've send us a commitment_signed! PendingHTLCStatus::Forward(PendingHTLCInfo { - type_data: PendingForwardReceiveHTLCInfo::Receive { payment_data }, + type_data: PendingForwardReceiveHTLCInfo::Receive { + payment_data, + incoming_cltv_expiry: msg.cltv_expiry, + }, payment_hash: msg.payment_hash.clone(), incoming_shared_secret: shared_secret, amt_to_forward: next_hop_data.amt_to_forward, @@ -1255,8 +1278,8 @@ impl ChannelManager where M::T let _ = self.total_consistency_lock.read().unwrap(); - let mut channel_lock = self.channel_state.lock().unwrap(); let err: Result<(), _> = loop { + let mut channel_lock = self.channel_state.lock().unwrap(); let id = match channel_lock.short_to_id.get(&path.first().unwrap().short_channel_id) { None => check_res_push!(Err(APIError::ChannelUnavailable{err: "No channel available with first hop!"})), Some(id) => id.clone(), @@ -1306,7 +1329,7 @@ impl ChannelManager where M::T continue 'path_loop; }; - match handle_error!(self, err, path.first().unwrap().pubkey, channel_lock) { + match handle_error!(self, err, path.first().unwrap().pubkey) { Ok(_) => unreachable!(), Err(e) => { check_res_push!(Err(APIError::ChannelUnavailable { err: e.err })); @@ -1348,8 +1371,7 @@ impl ChannelManager where M::T let _ = self.total_consistency_lock.read().unwrap(); let (mut chan, msg, chan_monitor) = { - let mut channel_state = self.channel_state.lock().unwrap(); - let (res, chan) = match channel_state.by_id.remove(temporary_channel_id) { + let (res, chan) = match self.channel_state.lock().unwrap().by_id.remove(temporary_channel_id) { Some(mut chan) => { (chan.get_outbound_funding_created(funding_txo) .map_err(|e| if let ChannelError::Close(msg) = e { @@ -1359,7 +1381,7 @@ impl ChannelManager where M::T }, None => return }; - match handle_error!(self, res, chan.get_their_node_id(), channel_state) { + match handle_error!(self, res, chan.get_their_node_id()) { Ok(funding_msg) => { (chan, funding_msg.0, funding_msg.1) }, @@ -1371,12 +1393,9 @@ impl ChannelManager where M::T if let Err(e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) { match e { ChannelMonitorUpdateErr::PermanentFailure => { - { - let mut channel_state = self.channel_state.lock().unwrap(); - match handle_error!(self, Err(MsgHandleErrInternal::from_finish_shutdown("ChannelMonitor storage failure", *temporary_channel_id, chan.force_shutdown(), None)), chan.get_their_node_id(), channel_state) { - Err(_) => { return; }, - Ok(()) => unreachable!(), - } + match handle_error!(self, Err(MsgHandleErrInternal::from_finish_shutdown("ChannelMonitor storage failure", *temporary_channel_id, chan.force_shutdown(), None)), chan.get_their_node_id()) { + Err(_) => { return; }, + Ok(()) => unreachable!(), } }, ChannelMonitorUpdateErr::TemporaryFailure => { @@ -1594,10 +1613,8 @@ impl ChannelManager where M::T }, ChannelError::CloseDelayBroadcast { .. } => { panic!("Wait is only generated on receipt of channel_reestablish, which is handled by try_chan_entry, we don't bother to support it here"); } }; - match handle_error!(self, err, their_node_id, channel_state) { - Ok(_) => unreachable!(), - Err(_) => { continue; }, - } + handle_errors.push((their_node_id, err)); + continue; } }; if let Err(e) = self.monitor.add_update_monitor(monitor.get_funding_txo().unwrap(), monitor) { @@ -1623,7 +1640,7 @@ impl ChannelManager where M::T for forward_info in pending_forwards.drain(..) { match forward_info { HTLCForwardInfo::AddHTLC { prev_short_channel_id, prev_htlc_id, forward_info: PendingHTLCInfo { - type_data: PendingForwardReceiveHTLCInfo::Receive { payment_data }, + type_data: PendingForwardReceiveHTLCInfo::Receive { payment_data, incoming_cltv_expiry }, incoming_shared_secret, payment_hash, amt_to_forward, .. }, } => { let prev_hop_data = HTLCPreviousHopData { short_channel_id: prev_short_channel_id, @@ -1639,6 +1656,7 @@ impl ChannelManager where M::T src: prev_hop_data, value: amt_to_forward, payment_data: payment_data.clone(), + cltv_expiry: incoming_cltv_expiry, }); if let &Some(ref data) = &payment_data { for htlc in htlcs.iter() { @@ -1689,11 +1707,8 @@ impl ChannelManager where M::T self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_source, &payment_hash, failure_reason); } - if handle_errors.len() > 0 { - let mut channel_state_lock = self.channel_state.lock().unwrap(); - for (their_node_id, err) in handle_errors.drain(..) { - let _ = handle_error!(self, err, their_node_id, channel_state_lock); - } + for (their_node_id, err) in handle_errors.drain(..) { + let _ = handle_error!(self, err, their_node_id); } if new_events.is_empty() { return } @@ -1956,7 +1971,8 @@ impl ChannelManager where M::T return; }; - let _ = handle_error!(self, err, their_node_id, channel_state_lock); + mem::drop(channel_state_lock); + let _ = handle_error!(self, err, their_node_id); } /// Gets the node_id held by this ChannelManager @@ -2687,9 +2703,9 @@ impl ChannelManager where M::T #[doc(hidden)] pub fn update_fee(&self, channel_id: [u8;32], feerate_per_kw: u64) -> Result<(), APIError> { let _ = self.total_consistency_lock.read().unwrap(); - let mut channel_state_lock = self.channel_state.lock().unwrap(); let their_node_id; let err: Result<(), _> = loop { + let mut channel_state_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_state_lock; match channel_state.by_id.entry(channel_id) { @@ -2728,7 +2744,7 @@ impl ChannelManager where M::T return Ok(()) }; - match handle_error!(self, err, their_node_id, channel_state_lock) { + match handle_error!(self, err, their_node_id) { Ok(_) => unreachable!(), Err(e) => { Err(APIError::APIMisuseError { err: e.err })} } @@ -2791,26 +2807,33 @@ impl ChainListener for ChannelM log_trace!(self, "Block {} at height {} connected with {} txn matched", header_hash, height, txn_matched.len()); let _ = self.total_consistency_lock.read().unwrap(); let mut failed_channels = Vec::new(); + let mut timed_out_htlcs = Vec::new(); { let mut channel_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_lock; let short_to_id = &mut channel_state.short_to_id; let pending_msg_events = &mut channel_state.pending_msg_events; channel_state.by_id.retain(|_, channel| { - let chan_res = channel.block_connected(header, height, txn_matched, indexes_of_txn_matched); - if let Ok(Some(funding_locked)) = chan_res { - pending_msg_events.push(events::MessageSendEvent::SendFundingLocked { - node_id: channel.get_their_node_id(), - msg: funding_locked, - }); - if let Some(announcement_sigs) = self.get_announcement_sigs(channel) { - pending_msg_events.push(events::MessageSendEvent::SendAnnouncementSignatures { + let res = channel.block_connected(header, height, txn_matched, indexes_of_txn_matched); + if let Ok((chan_res, mut timed_out_pending_htlcs)) = res { + timed_out_htlcs.reserve(timed_out_pending_htlcs.len()); + for (htlc_src, payment_hash, value) in timed_out_pending_htlcs.drain(..) { + timed_out_htlcs.push((htlc_src, payment_hash, value)); + } + if let Some(funding_locked) = chan_res { + pending_msg_events.push(events::MessageSendEvent::SendFundingLocked { node_id: channel.get_their_node_id(), - msg: announcement_sigs, + msg: funding_locked, }); + if let Some(announcement_sigs) = self.get_announcement_sigs(channel) { + pending_msg_events.push(events::MessageSendEvent::SendAnnouncementSignatures { + node_id: channel.get_their_node_id(), + msg: announcement_sigs, + }); + } + short_to_id.insert(channel.get_short_channel_id().unwrap(), channel.channel_id()); } - short_to_id.insert(channel.get_short_channel_id().unwrap(), channel.channel_id()); - } else if let Err(e) = chan_res { + } else if let Err(e) = res { pending_msg_events.push(events::MessageSendEvent::HandleError { node_id: channel.get_their_node_id(), action: msgs::ErrorAction::SendErrorMessage { msg: e }, @@ -2857,10 +2880,29 @@ impl ChainListener for ChannelM } true }); + + channel_state.claimable_htlcs.retain(|&(ref payment_hash, _), htlcs| { + htlcs.retain(|htlc| { + if height >= htlc.cltv_expiry - CLTV_CLAIM_BUFFER - LATENCY_GRACE_PERIOD_BLOCKS { + timed_out_htlcs.push((HTLCSource::PreviousHopData(htlc.src.clone()), payment_hash.clone(), htlc.value)); + false + } else { true } + }); + !htlcs.is_empty() + }); } for failure in failed_channels.drain(..) { self.finish_force_close_channel(failure); } + + for (source, payment_hash, value) in timed_out_htlcs.drain(..) { + // Call it preimage_unknown as the issue, ultimately, is that the user failed to + // provide us a preimage within the cltv_expiry time window. + self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), source, &payment_hash, HTLCFailReason::Reason { + failure_code: 0x4000 | 15, + data: byte_utils::be64_to_array(value).to_vec() + }); + } self.latest_block_height.store(height as usize, Ordering::Release); *self.last_block_hash.try_lock().expect("block_(dis)connected must not be called in parallel") = header_hash; } @@ -2902,146 +2944,82 @@ impl ChainListener for ChannelM impl ChannelMessageHandler for ChannelManager where M::Target: ManyChannelMonitor { fn handle_open_channel(&self, their_node_id: &PublicKey, their_features: InitFeatures, msg: &msgs::OpenChannel) { let _ = self.total_consistency_lock.read().unwrap(); - let res = self.internal_open_channel(their_node_id, their_features, msg); - if res.is_err() { - let mut channel_state_lock = self.channel_state.lock().unwrap(); - let _ = handle_error!(self, res, *their_node_id, channel_state_lock); - } + let _ = handle_error!(self, self.internal_open_channel(their_node_id, their_features, msg), *their_node_id); } fn handle_accept_channel(&self, their_node_id: &PublicKey, their_features: InitFeatures, msg: &msgs::AcceptChannel) { let _ = self.total_consistency_lock.read().unwrap(); - let res = self.internal_accept_channel(their_node_id, their_features, msg); - if res.is_err() { - let mut channel_state_lock = self.channel_state.lock().unwrap(); - let _ = handle_error!(self, res, *their_node_id, channel_state_lock); - } + let _ = handle_error!(self, self.internal_accept_channel(their_node_id, their_features, msg), *their_node_id); } fn handle_funding_created(&self, their_node_id: &PublicKey, msg: &msgs::FundingCreated) { let _ = self.total_consistency_lock.read().unwrap(); - let res = self.internal_funding_created(their_node_id, msg); - if res.is_err() { - let mut channel_state_lock = self.channel_state.lock().unwrap(); - let _ = handle_error!(self, res, *their_node_id, channel_state_lock); - } + let _ = handle_error!(self, self.internal_funding_created(their_node_id, msg), *their_node_id); } fn handle_funding_signed(&self, their_node_id: &PublicKey, msg: &msgs::FundingSigned) { let _ = self.total_consistency_lock.read().unwrap(); - let res = self.internal_funding_signed(their_node_id, msg); - if res.is_err() { - let mut channel_state_lock = self.channel_state.lock().unwrap(); - let _ = handle_error!(self, res, *their_node_id, channel_state_lock); - } + let _ = handle_error!(self, self.internal_funding_signed(their_node_id, msg), *their_node_id); } fn handle_funding_locked(&self, their_node_id: &PublicKey, msg: &msgs::FundingLocked) { let _ = self.total_consistency_lock.read().unwrap(); - let res = self.internal_funding_locked(their_node_id, msg); - if res.is_err() { - let mut channel_state_lock = self.channel_state.lock().unwrap(); - let _ = handle_error!(self, res, *their_node_id, channel_state_lock); - } + let _ = handle_error!(self, self.internal_funding_locked(their_node_id, msg), *their_node_id); } fn handle_shutdown(&self, their_node_id: &PublicKey, msg: &msgs::Shutdown) { let _ = self.total_consistency_lock.read().unwrap(); - let res = self.internal_shutdown(their_node_id, msg); - if res.is_err() { - let mut channel_state_lock = self.channel_state.lock().unwrap(); - let _ = handle_error!(self, res, *their_node_id, channel_state_lock); - } + let _ = handle_error!(self, self.internal_shutdown(their_node_id, msg), *their_node_id); } fn handle_closing_signed(&self, their_node_id: &PublicKey, msg: &msgs::ClosingSigned) { let _ = self.total_consistency_lock.read().unwrap(); - let res = self.internal_closing_signed(their_node_id, msg); - if res.is_err() { - let mut channel_state_lock = self.channel_state.lock().unwrap(); - let _ = handle_error!(self, res, *their_node_id, channel_state_lock); - } + let _ = handle_error!(self, self.internal_closing_signed(their_node_id, msg), *their_node_id); } fn handle_update_add_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateAddHTLC) { let _ = self.total_consistency_lock.read().unwrap(); - let res = self.internal_update_add_htlc(their_node_id, msg); - if res.is_err() { - let mut channel_state_lock = self.channel_state.lock().unwrap(); - let _ = handle_error!(self, res, *their_node_id, channel_state_lock); - } + let _ = handle_error!(self, self.internal_update_add_htlc(their_node_id, msg), *their_node_id); } fn handle_update_fulfill_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFulfillHTLC) { let _ = self.total_consistency_lock.read().unwrap(); - let res = self.internal_update_fulfill_htlc(their_node_id, msg); - if res.is_err() { - let mut channel_state_lock = self.channel_state.lock().unwrap(); - let _ = handle_error!(self, res, *their_node_id, channel_state_lock); - } + let _ = handle_error!(self, self.internal_update_fulfill_htlc(their_node_id, msg), *their_node_id); } fn handle_update_fail_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFailHTLC) { let _ = self.total_consistency_lock.read().unwrap(); - let res = self.internal_update_fail_htlc(their_node_id, msg); - if res.is_err() { - let mut channel_state_lock = self.channel_state.lock().unwrap(); - let _ = handle_error!(self, res, *their_node_id, channel_state_lock); - } + let _ = handle_error!(self, self.internal_update_fail_htlc(their_node_id, msg), *their_node_id); } fn handle_update_fail_malformed_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFailMalformedHTLC) { let _ = self.total_consistency_lock.read().unwrap(); - let res = self.internal_update_fail_malformed_htlc(their_node_id, msg); - if res.is_err() { - let mut channel_state_lock = self.channel_state.lock().unwrap(); - let _ = handle_error!(self, res, *their_node_id, channel_state_lock); - } + let _ = handle_error!(self, self.internal_update_fail_malformed_htlc(their_node_id, msg), *their_node_id); } fn handle_commitment_signed(&self, their_node_id: &PublicKey, msg: &msgs::CommitmentSigned) { let _ = self.total_consistency_lock.read().unwrap(); - let res = self.internal_commitment_signed(their_node_id, msg); - if res.is_err() { - let mut channel_state_lock = self.channel_state.lock().unwrap(); - let _ = handle_error!(self, res, *their_node_id, channel_state_lock); - } + let _ = handle_error!(self, self.internal_commitment_signed(their_node_id, msg), *their_node_id); } fn handle_revoke_and_ack(&self, their_node_id: &PublicKey, msg: &msgs::RevokeAndACK) { let _ = self.total_consistency_lock.read().unwrap(); - let res = self.internal_revoke_and_ack(their_node_id, msg); - if res.is_err() { - let mut channel_state_lock = self.channel_state.lock().unwrap(); - let _ = handle_error!(self, res, *their_node_id, channel_state_lock); - } + let _ = handle_error!(self, self.internal_revoke_and_ack(their_node_id, msg), *their_node_id); } fn handle_update_fee(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFee) { let _ = self.total_consistency_lock.read().unwrap(); - let res = self.internal_update_fee(their_node_id, msg); - if res.is_err() { - let mut channel_state_lock = self.channel_state.lock().unwrap(); - let _ = handle_error!(self, res, *their_node_id, channel_state_lock); - } + let _ = handle_error!(self, self.internal_update_fee(their_node_id, msg), *their_node_id); } fn handle_announcement_signatures(&self, their_node_id: &PublicKey, msg: &msgs::AnnouncementSignatures) { let _ = self.total_consistency_lock.read().unwrap(); - let res = self.internal_announcement_signatures(their_node_id, msg); - if res.is_err() { - let mut channel_state_lock = self.channel_state.lock().unwrap(); - let _ = handle_error!(self, res, *their_node_id, channel_state_lock); - } + let _ = handle_error!(self, self.internal_announcement_signatures(their_node_id, msg), *their_node_id); } fn handle_channel_reestablish(&self, their_node_id: &PublicKey, msg: &msgs::ChannelReestablish) { let _ = self.total_consistency_lock.read().unwrap(); - let res = self.internal_channel_reestablish(their_node_id, msg); - if res.is_err() { - let mut channel_state_lock = self.channel_state.lock().unwrap(); - let _ = handle_error!(self, res, *their_node_id, channel_state_lock); - } + let _ = handle_error!(self, self.internal_channel_reestablish(their_node_id, msg), *their_node_id); } fn peer_disconnected(&self, their_node_id: &PublicKey, no_connection_possible: bool) { @@ -3197,9 +3175,10 @@ impl Writeable for PendingHTLCInfo { onion_packet.write(writer)?; short_channel_id.write(writer)?; }, - &PendingForwardReceiveHTLCInfo::Receive { ref payment_data } => { + &PendingForwardReceiveHTLCInfo::Receive { ref payment_data, ref incoming_cltv_expiry } => { 1u8.write(writer)?; payment_data.write(writer)?; + incoming_cltv_expiry.write(writer)?; }, } self.incoming_shared_secret.write(writer)?; @@ -3220,6 +3199,7 @@ impl Readable for PendingHTLCInfo { }, 1u8 => PendingForwardReceiveHTLCInfo::Receive { payment_data: Readable::read(reader)?, + incoming_cltv_expiry: Readable::read(reader)?, }, _ => return Err(DecodeError::InvalidValue), }, @@ -3429,6 +3409,7 @@ impl Writeable for ChannelManager htlc.src.write(writer)?; htlc.value.write(writer)?; htlc.payment_data.write(writer)?; + htlc.cltv_expiry.write(writer)?; } } @@ -3501,6 +3482,15 @@ pub struct ChannelManagerReadArgs<'a, ChanSigner: ChannelKeys, M: Deref> where M pub channel_monitors: &'a mut HashMap, } +// Implement ReadableArgs for an Arc'd ChannelManager to make it a bit easier to work with the +// SipmleArcChannelManager type: +impl<'a, R : ::std::io::Read, ChanSigner: ChannelKeys + Readable, M: Deref> ReadableArgs> for (Sha256dHash, Arc>) where M::Target: ManyChannelMonitor { + fn read(reader: &mut R, args: ChannelManagerReadArgs<'a, ChanSigner, M>) -> Result { + let (blockhash, chan_manager) = <(Sha256dHash, ChannelManager)>::read(reader, args)?; + Ok((blockhash, Arc::new(chan_manager))) + } +} + impl<'a, R : ::std::io::Read, ChanSigner: ChannelKeys + Readable, M: Deref> ReadableArgs> for (Sha256dHash, ChannelManager) where M::Target: ManyChannelMonitor { fn read(reader: &mut R, args: ChannelManagerReadArgs<'a, ChanSigner, M>) -> Result { let _ver: u8 = Readable::read(reader)?; @@ -3574,6 +3564,7 @@ impl<'a, R : ::std::io::Read, ChanSigner: ChannelKeys + Readable, M: Deref> R src: Readable::read(reader)?, value: Readable::read(reader)?, payment_data: Readable::read(reader)?, + cltv_expiry: Readable::read(reader)?, }); } claimable_htlcs.insert(payment_hash, previous_hops);