/// probably increase this significantly.
const MIN_HTLC_RELAY_HOLDING_CELL_MILLIS: u32 = 50;
-pub(super) struct HTLCForwardInfo {
- prev_short_channel_id: u64,
- prev_htlc_id: u64,
- #[cfg(test)]
- pub(super) forward_info: PendingForwardHTLCInfo,
- #[cfg(not(test))]
- forward_info: PendingForwardHTLCInfo,
+pub(super) enum HTLCForwardInfo {
+ AddHTLC {
+ prev_short_channel_id: u64,
+ prev_htlc_id: u64,
+ forward_info: PendingForwardHTLCInfo,
+ },
+ FailHTLC {
+ htlc_id: u64,
+ err_packet: msgs::OnionErrorPacket,
+ },
}
/// For events which result in both a RevokeAndACK and a CommitmentUpdate, by default they should
Some(chan_id) => chan_id.clone(),
None => {
failed_forwards.reserve(pending_forwards.len());
- for HTLCForwardInfo { prev_short_channel_id, prev_htlc_id, forward_info } in pending_forwards.drain(..) {
- let htlc_source = HTLCSource::PreviousHopData(HTLCPreviousHopData {
- short_channel_id: prev_short_channel_id,
- htlc_id: prev_htlc_id,
- incoming_packet_shared_secret: forward_info.incoming_shared_secret,
- });
- failed_forwards.push((htlc_source, forward_info.payment_hash, 0x4000 | 10, None));
+ for forward_info in pending_forwards.drain(..) {
+ match forward_info {
+ HTLCForwardInfo::AddHTLC { prev_short_channel_id, prev_htlc_id, forward_info } => {
+ let htlc_source = HTLCSource::PreviousHopData(HTLCPreviousHopData {
+ short_channel_id: prev_short_channel_id,
+ htlc_id: prev_htlc_id,
+ incoming_packet_shared_secret: forward_info.incoming_shared_secret,
+ });
+ failed_forwards.push((htlc_source, forward_info.payment_hash, 0x4000 | 10, None));
+ },
+ HTLCForwardInfo::FailHTLC { .. } => {
+ // Channel went away before we could fail it. This implies
+ // the channel is now on chain and our counterparty is
+ // trying to broadcast the HTLC-Timeout, but that's their
+ // problem, not ours.
+ }
+ }
}
continue;
}
let forward_chan = &mut channel_state.by_id.get_mut(&forward_chan_id).unwrap();
let mut add_htlc_msgs = Vec::new();
- for HTLCForwardInfo { prev_short_channel_id, prev_htlc_id, forward_info } in pending_forwards.drain(..) {
- let htlc_source = HTLCSource::PreviousHopData(HTLCPreviousHopData {
- short_channel_id: prev_short_channel_id,
- htlc_id: prev_htlc_id,
- incoming_packet_shared_secret: forward_info.incoming_shared_secret,
- });
- match forward_chan.send_htlc(forward_info.amt_to_forward, forward_info.payment_hash, forward_info.outgoing_cltv_value, htlc_source.clone(), forward_info.onion_packet.unwrap()) {
- Err(_e) => {
- let chan_update = self.get_channel_update(forward_chan).unwrap();
- failed_forwards.push((htlc_source, forward_info.payment_hash, 0x1000 | 7, Some(chan_update)));
- continue;
+ let mut fail_htlc_msgs = Vec::new();
+ for forward_info in pending_forwards.drain(..) {
+ match forward_info {
+ HTLCForwardInfo::AddHTLC { prev_short_channel_id, prev_htlc_id, forward_info } => {
+ log_trace!(self, "Adding HTLC from short id {} with payment_hash {} to channel with short id {} after delay", log_bytes!(forward_info.payment_hash.0), prev_short_channel_id, short_chan_id);
+ let htlc_source = HTLCSource::PreviousHopData(HTLCPreviousHopData {
+ short_channel_id: prev_short_channel_id,
+ htlc_id: prev_htlc_id,
+ incoming_packet_shared_secret: forward_info.incoming_shared_secret,
+ });
+ match forward_chan.send_htlc(forward_info.amt_to_forward, forward_info.payment_hash, forward_info.outgoing_cltv_value, htlc_source.clone(), forward_info.onion_packet.unwrap()) {
+ Err(e) => {
+ if let ChannelError::Ignore(msg) = e {
+ log_trace!(self, "Failed to forward HTLC with payment_hash {}: {}", log_bytes!(forward_info.payment_hash.0), msg);
+ } else {
+ panic!("Stated return value requirements in send_htlc() were not met");
+ }
+ let chan_update = self.get_channel_update(forward_chan).unwrap();
+ failed_forwards.push((htlc_source, forward_info.payment_hash, 0x1000 | 7, Some(chan_update)));
+ continue;
+ },
+ Ok(update_add) => {
+ match update_add {
+ Some(msg) => { add_htlc_msgs.push(msg); },
+ None => {
+ // Nothing to do here...we're waiting on a remote
+ // revoke_and_ack before we can add anymore HTLCs. The Channel
+ // will automatically handle building the update_add_htlc and
+ // commitment_signed messages when we can.
+ // TODO: Do some kind of timer to set the channel as !is_live()
+ // as we don't really want others relying on us relaying through
+ // this channel currently :/.
+ }
+ }
+ }
+ }
},
- Ok(update_add) => {
- match update_add {
- Some(msg) => { add_htlc_msgs.push(msg); },
- None => {
+ HTLCForwardInfo::FailHTLC { htlc_id, err_packet } => {
+ log_trace!(self, "Failing HTLC back to channel with short id {} after delay", short_chan_id);
+ match forward_chan.get_update_fail_htlc(htlc_id, err_packet) {
+ Err(e) => {
+ if let ChannelError::Ignore(msg) = e {
+ log_trace!(self, "Failed to fail backwards to short_id {}: {}", short_chan_id, msg);
+ } else {
+ panic!("Stated return value requirements in get_update_fail_htlc() were not met");
+ }
+ // fail-backs are best-effort, we probably already have one
+ // pending, and if not that's OK, if not, the channel is on
+ // the chain and sending the HTLC-Timeout is their problem.
+ continue;
+ },
+ Ok(Some(msg)) => { fail_htlc_msgs.push(msg); },
+ Ok(None) => {
// Nothing to do here...we're waiting on a remote
- // revoke_and_ack before we can add anymore HTLCs. The Channel
- // will automatically handle building the update_add_htlc and
- // commitment_signed messages when we can.
- // TODO: Do some kind of timer to set the channel as !is_live()
- // as we don't really want others relying on us relaying through
- // this channel currently :/.
+ // revoke_and_ack before we can update the commitment
+ // transaction. The Channel will automatically handle
+ // building the update_fail_htlc and commitment_signed
+ // messages when we can.
+ // We don't need any kind of timer here as they should fail
+ // the channel onto the chain if they can't get our
+ // update_fail_htlc in time, its not our problem.
}
}
- }
+ },
}
}
- if !add_htlc_msgs.is_empty() {
+ if !add_htlc_msgs.is_empty() || !fail_htlc_msgs.is_empty() {
let (commitment_msg, monitor) = match forward_chan.send_commitment() {
Ok(res) => res,
Err(e) => {
updates: msgs::CommitmentUpdate {
update_add_htlcs: add_htlc_msgs,
update_fulfill_htlcs: Vec::new(),
- update_fail_htlcs: Vec::new(),
+ update_fail_htlcs: fail_htlc_msgs,
update_fail_malformed_htlcs: Vec::new(),
update_fee: None,
commitment_signed: commitment_msg,
});
}
} else {
- for HTLCForwardInfo { prev_short_channel_id, prev_htlc_id, forward_info } in pending_forwards.drain(..) {
- let prev_hop_data = HTLCPreviousHopData {
- short_channel_id: prev_short_channel_id,
- htlc_id: prev_htlc_id,
- incoming_packet_shared_secret: forward_info.incoming_shared_secret,
- };
- match channel_state.claimable_htlcs.entry(forward_info.payment_hash) {
- hash_map::Entry::Occupied(mut entry) => entry.get_mut().push(prev_hop_data),
- hash_map::Entry::Vacant(entry) => { entry.insert(vec![prev_hop_data]); },
- };
- new_events.push(events::Event::PaymentReceived {
- payment_hash: forward_info.payment_hash,
- amt: forward_info.amt_to_forward,
- });
+ for forward_info in pending_forwards.drain(..) {
+ match forward_info {
+ HTLCForwardInfo::AddHTLC { prev_short_channel_id, prev_htlc_id, forward_info } => {
+ let prev_hop_data = HTLCPreviousHopData {
+ short_channel_id: prev_short_channel_id,
+ htlc_id: prev_htlc_id,
+ incoming_packet_shared_secret: forward_info.incoming_shared_secret,
+ };
+ match channel_state.claimable_htlcs.entry(forward_info.payment_hash) {
+ hash_map::Entry::Occupied(mut entry) => entry.get_mut().push(prev_hop_data),
+ hash_map::Entry::Vacant(entry) => { entry.insert(vec![prev_hop_data]); },
+ };
+ new_events.push(events::Event::PaymentReceived {
+ payment_hash: forward_info.payment_hash,
+ amt: forward_info.amt_to_forward,
+ });
+ },
+ HTLCForwardInfo::FailHTLC { .. } => {
+ panic!("Got pending fail of our own HTLC");
+ }
+ }
}
}
}
/// drop it). In other words, no assumptions are made that entries in claimable_htlcs point to
/// still-available channels.
fn fail_htlc_backwards_internal(&self, mut channel_state_lock: MutexGuard<ChannelHolder>, source: HTLCSource, payment_hash: &PaymentHash, onion_error: HTLCFailReason) {
+ //TODO: There is a timing attack here where if a node fails an HTLC back to us they can
+ //identify whether we sent it or not based on the (I presume) very different runtime
+ //between the branches here. We should make this async and move it into the forward HTLCs
+ //timer handling.
match source {
HTLCSource::OutboundRoute { ref route, .. } => {
log_trace!(self, "Failing outbound payment HTLC with payment_hash {}", log_bytes!(payment_hash.0));
}
};
- let channel_state = channel_state_lock.borrow_parts();
-
- let chan_id = match channel_state.short_to_id.get(&short_channel_id) {
- Some(chan_id) => chan_id.clone(),
- None => return
- };
-
- let chan = channel_state.by_id.get_mut(&chan_id).unwrap();
- match chan.get_update_fail_htlc_and_commit(htlc_id, err_packet) {
- Ok(Some((msg, commitment_msg, chan_monitor))) => {
- if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
- unimplemented!();
- }
- channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
- node_id: chan.get_their_node_id(),
- updates: msgs::CommitmentUpdate {
- update_add_htlcs: Vec::new(),
- update_fulfill_htlcs: Vec::new(),
- update_fail_htlcs: vec![msg],
- update_fail_malformed_htlcs: Vec::new(),
- update_fee: None,
- commitment_signed: commitment_msg,
- },
- });
- },
- Ok(None) => {},
- Err(_e) => {
- //TODO: Do something with e?
- return;
+ let mut forward_event = None;
+ if channel_state_lock.forward_htlcs.is_empty() {
+ forward_event = Some(Instant::now() + Duration::from_millis(((rng::rand_f32() * 4.0 + 1.0) * MIN_HTLC_RELAY_HOLDING_CELL_MILLIS as f32) as u64));
+ channel_state_lock.next_forward = forward_event.unwrap();
+ }
+ match channel_state_lock.forward_htlcs.entry(short_channel_id) {
+ hash_map::Entry::Occupied(mut entry) => {
+ entry.get_mut().push(HTLCForwardInfo::FailHTLC { htlc_id, err_packet });
},
+ hash_map::Entry::Vacant(entry) => {
+ entry.insert(vec!(HTLCForwardInfo::FailHTLC { htlc_id, err_packet }));
+ }
+ }
+ mem::drop(channel_state_lock);
+ if let Some(time) = forward_event {
+ let mut pending_events = self.pending_events.lock().unwrap();
+ pending_events.push(events::Event::PendingHTLCsForwardable {
+ time_forwardable: time
+ });
}
},
}
for (forward_info, prev_htlc_id) in pending_forwards.drain(..) {
match channel_state.forward_htlcs.entry(forward_info.short_channel_id) {
hash_map::Entry::Occupied(mut entry) => {
- entry.get_mut().push(HTLCForwardInfo { prev_short_channel_id, prev_htlc_id, forward_info });
+ entry.get_mut().push(HTLCForwardInfo::AddHTLC { prev_short_channel_id, prev_htlc_id, forward_info });
},
hash_map::Entry::Vacant(entry) => {
- entry.insert(vec!(HTLCForwardInfo { prev_short_channel_id, prev_htlc_id, forward_info }));
+ entry.insert(vec!(HTLCForwardInfo::AddHTLC { prev_short_channel_id, prev_htlc_id, forward_info }));
}
}
}
}
}
-impl_writeable!(HTLCForwardInfo, 0, {
- prev_short_channel_id,
- prev_htlc_id,
- forward_info
-});
+impl Writeable for HTLCForwardInfo {
+ fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ::std::io::Error> {
+ match self {
+ &HTLCForwardInfo::AddHTLC { ref prev_short_channel_id, ref prev_htlc_id, ref forward_info } => {
+ 0u8.write(writer)?;
+ prev_short_channel_id.write(writer)?;
+ prev_htlc_id.write(writer)?;
+ forward_info.write(writer)?;
+ },
+ &HTLCForwardInfo::FailHTLC { ref htlc_id, ref err_packet } => {
+ 1u8.write(writer)?;
+ htlc_id.write(writer)?;
+ err_packet.write(writer)?;
+ },
+ }
+ Ok(())
+ }
+}
+
+impl<R: ::std::io::Read> Readable<R> for HTLCForwardInfo {
+ fn read(reader: &mut R) -> Result<HTLCForwardInfo, DecodeError> {
+ match <u8 as Readable<R>>::read(reader)? {
+ 0 => Ok(HTLCForwardInfo::AddHTLC {
+ prev_short_channel_id: Readable::read(reader)?,
+ prev_htlc_id: Readable::read(reader)?,
+ forward_info: Readable::read(reader)?,
+ }),
+ 1 => Ok(HTLCForwardInfo::FailHTLC {
+ htlc_id: Readable::read(reader)?,
+ err_packet: Readable::read(reader)?,
+ }),
+ _ => Err(DecodeError::InvalidValue),
+ }
+ }
+}
impl Writeable for ChannelManager {
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ::std::io::Error> {