From: Matt Corallo <649246+TheBlueMatt@users.noreply.github.com> Date: Tue, 23 Oct 2018 20:45:50 +0000 (-0400) Subject: Merge pull request #213 from TheBlueMatt/2018-10-monitor-fail-pause X-Git-Tag: v0.0.12~289 X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=commitdiff_plain;h=3bcd911fcb112551525521dc3d16b88e07314318;hp=65b23d8d8a936ad5047eb1ec0115a9679ae5eca1;p=rust-lightning Merge pull request #213 from TheBlueMatt/2018-10-monitor-fail-pause Add ChannelManager support for monitor update failure in one place --- diff --git a/src/ln/channel.rs b/src/ln/channel.rs index 99280749..6b151138 100644 --- a/src/ln/channel.rs +++ b/src/ln/channel.rs @@ -248,28 +248,32 @@ enum ChannelState { /// "disconnected" and no updates are allowed until after we've done a channel_reestablish /// dance. PeerDisconnected = (1 << 7), + /// Flag which is set on ChannelFunded and FundingSent indicating the user has told us they + /// failed to update our ChannelMonitor somewhere and we should pause sending any outbound + /// messages until they've managed to do so. + MonitorUpdateFailed = (1 << 8), /// Flag which implies that we have sent a commitment_signed but are awaiting the responding /// revoke_and_ack message. During this time period, we can't generate new commitment_signed /// messages as then we will be unable to determine which HTLCs they included in their /// revoke_and_ack implicit ACK, so instead we have to hold them away temporarily to be sent /// later. /// Flag is set on ChannelFunded. - AwaitingRemoteRevoke = (1 << 8), + AwaitingRemoteRevoke = (1 << 9), /// Flag which is set on ChannelFunded or FundingSent after receiving a shutdown message from /// the remote end. If set, they may not add any new HTLCs to the channel, and we are expected /// to respond with our own shutdown message when possible. - RemoteShutdownSent = (1 << 9), + RemoteShutdownSent = (1 << 10), /// Flag which is set on ChannelFunded or FundingSent after sending a shutdown message. At this /// point, we may not add any new HTLCs to the channel. /// TODO: Investigate some kind of timeout mechanism by which point the remote end must provide /// us their shutdown. - LocalShutdownSent = (1 << 10), + LocalShutdownSent = (1 << 11), /// We've successfully negotiated a closing_signed dance. At this point ChannelManager is about /// to drop us, but we store this anyway. - ShutdownComplete = 2048, + ShutdownComplete = 4096, } const BOTH_SIDES_SHUTDOWN_MASK: u32 = (ChannelState::LocalShutdownSent as u32 | ChannelState::RemoteShutdownSent as u32); -const MULTI_STATE_FLAGS: u32 = (BOTH_SIDES_SHUTDOWN_MASK | ChannelState::PeerDisconnected as u32); +const MULTI_STATE_FLAGS: u32 = (BOTH_SIDES_SHUTDOWN_MASK | ChannelState::PeerDisconnected as u32 | ChannelState::MonitorUpdateFailed as u32); const INITIAL_COMMITMENT_NUMBER: u64 = (1 << 48) - 1; @@ -306,6 +310,12 @@ pub(super) struct Channel { pending_outbound_htlcs: Vec, holding_cell_htlc_updates: Vec, + monitor_pending_revoke_and_ack: bool, + monitor_pending_commitment_signed: bool, + monitor_pending_order: Option, + monitor_pending_forwards: Vec<(PendingForwardHTLCInfo, u64)>, + monitor_pending_failures: Vec<(HTLCSource, [u8; 32], HTLCFailReason)>, + // pending_update_fee is filled when sending and receiving update_fee // For outbound channel, feerate_per_kw is updated with the value from // pending_update_fee when revoke_and_ack is received @@ -509,6 +519,12 @@ impl Channel { next_remote_htlc_id: 0, channel_update_count: 1, + monitor_pending_revoke_and_ack: false, + monitor_pending_commitment_signed: false, + monitor_pending_order: None, + monitor_pending_forwards: Vec::new(), + monitor_pending_failures: Vec::new(), + #[cfg(debug_assertions)] max_commitment_tx_output_local: ::std::sync::Mutex::new((channel_value_satoshis * 1000 - push_msat, push_msat)), #[cfg(debug_assertions)] @@ -666,6 +682,12 @@ impl Channel { next_remote_htlc_id: 0, channel_update_count: 1, + monitor_pending_revoke_and_ack: false, + monitor_pending_commitment_signed: false, + monitor_pending_order: None, + monitor_pending_forwards: Vec::new(), + monitor_pending_failures: Vec::new(), + #[cfg(debug_assertions)] max_commitment_tx_output_local: ::std::sync::Mutex::new((msg.push_msat, msg.funding_satoshis * 1000 - msg.push_msat)), #[cfg(debug_assertions)] @@ -1166,7 +1188,7 @@ impl Channel { // can claim it even if the channel hits the chain before we see their next commitment. self.channel_monitor.provide_payment_preimage(&payment_hash_calc, &payment_preimage_arg); - if (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32 | ChannelState::PeerDisconnected as u32)) != 0 { + if (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32 | ChannelState::PeerDisconnected as u32 | ChannelState::MonitorUpdateFailed as u32)) != 0 { for pending_update in self.holding_cell_htlc_updates.iter() { match pending_update { &HTLCUpdateAwaitingACK::ClaimHTLC { htlc_id, .. } => { @@ -1243,7 +1265,7 @@ impl Channel { } // Now update local state: - if (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32 | ChannelState::PeerDisconnected as u32)) != 0 { + if (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32 | ChannelState::PeerDisconnected as u32 | ChannelState::MonitorUpdateFailed as u32)) != 0 { for pending_update in self.holding_cell_htlc_updates.iter() { match pending_update { &HTLCUpdateAwaitingACK::ClaimHTLC { htlc_id, .. } => { @@ -1461,11 +1483,13 @@ impl Channel { if self.channel_state & (ChannelState::PeerDisconnected as u32) == ChannelState::PeerDisconnected as u32 { return Err(ChannelError::Close("Peer sent funding_locked when we needed a channel_reestablish")); } - let non_shutdown_state = self.channel_state & (!BOTH_SIDES_SHUTDOWN_MASK); + + let non_shutdown_state = self.channel_state & (!MULTI_STATE_FLAGS); + if non_shutdown_state == ChannelState::FundingSent as u32 { self.channel_state |= ChannelState::TheirFundingLocked as u32; } else if non_shutdown_state == (ChannelState::FundingSent as u32 | ChannelState::OurFundingLocked as u32) { - self.channel_state = ChannelState::ChannelFunded as u32 | (self.channel_state & BOTH_SIDES_SHUTDOWN_MASK); + self.channel_state = ChannelState::ChannelFunded as u32 | (self.channel_state & MULTI_STATE_FLAGS); self.channel_update_count += 1; } else if self.channel_state & (ChannelState::ChannelFunded as u32) != 0 && // Note that funding_signed/funding_created will have decremented both by 1! @@ -1685,6 +1709,11 @@ impl Channel { } } } + if self.channel_state & (ChannelState::MonitorUpdateFailed as u32) == 0 { + // This is a response to our post-monitor-failed unfreeze messages, so we can clear the + // monitor_pending_order requirement as we won't re-send the monitor_pending messages. + self.monitor_pending_order = None; + } self.channel_monitor.provide_latest_local_commitment_tx_info(local_commitment_tx.0, local_keys, self.feerate_per_kw, htlcs_and_sigs); @@ -1708,6 +1737,12 @@ impl Channel { self.last_local_commitment_txn = new_local_commitment_txn; self.received_commitment_while_awaiting_raa = (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32)) != 0; + if (self.channel_state & ChannelState::MonitorUpdateFailed as u32) != 0 { + self.monitor_pending_revoke_and_ack = true; + self.monitor_pending_commitment_signed |= need_our_commitment; + return Err(HandleError{err: "Previous monitor update failure prevented generation of RAA", action: Some(ErrorAction::IgnoreError)}); + } + let (our_commitment_signed, monitor_update) = if need_our_commitment && (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32)) == 0 { // If we're AwaitingRemoteRevoke we can't send a new commitment here, but that's ok - // we'll send one right away when we get the revoke_and_ack when we @@ -1726,6 +1761,7 @@ impl Channel { /// Used to fulfill holding_cell_htlcs when we get a remote ack (or implicitly get it by them /// fulfilling or failing the last pending HTLC) fn free_holding_cell_htlcs(&mut self) -> Result, HandleError> { + assert_eq!(self.channel_state & ChannelState::MonitorUpdateFailed as u32, 0); if self.holding_cell_htlc_updates.len() != 0 || self.holding_cell_update_fee.is_some() { let mut htlc_updates = Vec::new(); mem::swap(&mut htlc_updates, &mut self.holding_cell_htlc_updates); @@ -1827,6 +1863,7 @@ impl Channel { if self.channel_state & (ChannelState::PeerDisconnected as u32) == ChannelState::PeerDisconnected as u32 { return Err(HandleError{err: "Peer sent revoke_and_ack when we needed a channel_reestablish", action: Some(msgs::ErrorAction::SendErrorMessage{msg: msgs::ErrorMessage{data: "Peer sent revoke_and_ack when we needed a channel_reestablish".to_string(), channel_id: msg.channel_id}})}); } + if let Some(their_prev_commitment_point) = self.their_prev_commitment_point { if PublicKey::from_secret_key(&self.secp_ctx, &secp_call!(SecretKey::from_slice(&self.secp_ctx, &msg.per_commitment_secret), "Peer provided an invalid per_commitment_secret", self.channel_id())) != their_prev_commitment_point { return Err(HandleError{err: "Got a revoke commitment secret which didn't correspond to their current pubkey", action: None}); @@ -1843,6 +1880,11 @@ impl Channel { self.their_cur_commitment_point = Some(msg.next_per_commitment_point); self.cur_remote_commitment_transaction_number -= 1; self.received_commitment_while_awaiting_raa = false; + if self.channel_state & (ChannelState::MonitorUpdateFailed as u32) == 0 { + // This is a response to our post-monitor-failed unfreeze messages, so we can clear the + // monitor_pending_order requirement as we won't re-send the monitor_pending messages. + self.monitor_pending_order = None; + } let mut to_forward_infos = Vec::new(); let mut revoked_htlcs = Vec::new(); @@ -1934,6 +1976,17 @@ impl Channel { } } + if (self.channel_state & ChannelState::MonitorUpdateFailed as u32) == ChannelState::MonitorUpdateFailed as u32 { + // We can't actually generate a new commitment transaction (incl by freeing holding + // cells) while we can't update the monitor, so we just return what we have. + if require_commitment { + self.monitor_pending_commitment_signed = true; + } + self.monitor_pending_forwards.append(&mut to_forward_infos); + self.monitor_pending_failures.append(&mut revoked_htlcs); + return Ok((None, Vec::new(), Vec::new(), self.channel_monitor.clone())); + } + match self.free_holding_cell_htlcs()? { Some(mut commitment_update) => { commitment_update.0.update_fail_htlcs.reserve(update_fail_htlcs.len()); @@ -1976,7 +2029,7 @@ impl Channel { panic!("Cannot update fee until channel is fully established and we haven't started shutting down"); } if !self.is_live() { - panic!("Cannot update fee while peer is disconnected (ChannelManager should have caught this)"); + panic!("Cannot update fee while peer is disconnected/we're awaiting a monitor update (ChannelManager should have caught this)"); } if (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32)) == (ChannelState::AwaitingRemoteRevoke as u32) { @@ -2068,6 +2121,60 @@ impl Channel { outbound_drops } + /// Indicates that a ChannelMonitor update failed to be stored by the client and further + /// updates are partially paused. + /// This must be called immediately after the call which generated the ChannelMonitor update + /// which failed, with the order argument set to the type of call it represented (ie a + /// commitment update or a revoke_and_ack generation). The messages which were generated from + /// that original call must *not* have been sent to the remote end, and must instead have been + /// dropped. They will be regenerated when monitor_updating_restored is called. + pub fn monitor_update_failed(&mut self, order: RAACommitmentOrder) { + assert_eq!(self.channel_state & ChannelState::MonitorUpdateFailed as u32, 0); + match order { + RAACommitmentOrder::CommitmentFirst => { + self.monitor_pending_revoke_and_ack = false; + self.monitor_pending_commitment_signed = true; + }, + RAACommitmentOrder::RevokeAndACKFirst => { + self.monitor_pending_revoke_and_ack = true; + self.monitor_pending_commitment_signed = false; + }, + } + self.monitor_pending_order = Some(order); + self.channel_state |= ChannelState::MonitorUpdateFailed as u32; + } + + /// Indicates that the latest ChannelMonitor update has been committed by the client + /// successfully and we should restore normal operation. Returns messages which should be sent + /// to the remote side. + pub fn monitor_updating_restored(&mut self) -> (Option, Option, RAACommitmentOrder, Vec<(PendingForwardHTLCInfo, u64)>, Vec<(HTLCSource, [u8; 32], HTLCFailReason)>) { + assert_eq!(self.channel_state & ChannelState::MonitorUpdateFailed as u32, ChannelState::MonitorUpdateFailed as u32); + self.channel_state &= !(ChannelState::MonitorUpdateFailed as u32); + + let mut forwards = Vec::new(); + mem::swap(&mut forwards, &mut self.monitor_pending_forwards); + let mut failures = Vec::new(); + mem::swap(&mut failures, &mut self.monitor_pending_failures); + + if self.channel_state & (ChannelState::PeerDisconnected as u32) != 0 { + // Leave monitor_pending_order so we can order our channel_reestablish responses + self.monitor_pending_revoke_and_ack = false; + self.monitor_pending_commitment_signed = false; + return (None, None, RAACommitmentOrder::RevokeAndACKFirst, forwards, failures); + } + + let raa = if self.monitor_pending_revoke_and_ack { + Some(self.get_last_revoke_and_ack()) + } else { None }; + let commitment_update = if self.monitor_pending_commitment_signed { + Some(self.get_last_commitment_update()) + } else { None }; + + self.monitor_pending_revoke_and_ack = false; + self.monitor_pending_commitment_signed = false; + (raa, commitment_update, self.monitor_pending_order.clone().unwrap(), forwards, failures) + } + pub fn update_fee(&mut self, fee_estimator: &FeeEstimator, msg: &msgs::UpdateFee) -> Result<(), ChannelError> { if self.channel_outbound { return Err(ChannelError::Close("Non-funding remote tried to update channel fee")); @@ -2082,6 +2189,71 @@ impl Channel { Ok(()) } + fn get_last_revoke_and_ack(&self) -> msgs::RevokeAndACK { + let next_per_commitment_point = PublicKey::from_secret_key(&self.secp_ctx, &self.build_local_commitment_secret(self.cur_local_commitment_transaction_number)); + let per_commitment_secret = chan_utils::build_commitment_secret(self.local_keys.commitment_seed, self.cur_local_commitment_transaction_number + 2); + msgs::RevokeAndACK { + channel_id: self.channel_id, + per_commitment_secret, + next_per_commitment_point, + } + } + + fn get_last_commitment_update(&self) -> msgs::CommitmentUpdate { + let mut update_add_htlcs = Vec::new(); + let mut update_fulfill_htlcs = Vec::new(); + let mut update_fail_htlcs = Vec::new(); + let mut update_fail_malformed_htlcs = Vec::new(); + + for htlc in self.pending_outbound_htlcs.iter() { + if let &OutboundHTLCState::LocalAnnounced(ref onion_packet) = &htlc.state { + update_add_htlcs.push(msgs::UpdateAddHTLC { + channel_id: self.channel_id(), + htlc_id: htlc.htlc_id, + amount_msat: htlc.amount_msat, + payment_hash: htlc.payment_hash, + cltv_expiry: htlc.cltv_expiry, + onion_routing_packet: (**onion_packet).clone(), + }); + } + } + + for htlc in self.pending_inbound_htlcs.iter() { + if let &InboundHTLCState::LocalRemoved(ref reason) = &htlc.state { + match reason { + &InboundHTLCRemovalReason::FailRelay(ref err_packet) => { + update_fail_htlcs.push(msgs::UpdateFailHTLC { + channel_id: self.channel_id(), + htlc_id: htlc.htlc_id, + reason: err_packet.clone() + }); + }, + &InboundHTLCRemovalReason::FailMalformed((ref sha256_of_onion, ref failure_code)) => { + update_fail_malformed_htlcs.push(msgs::UpdateFailMalformedHTLC { + channel_id: self.channel_id(), + htlc_id: htlc.htlc_id, + sha256_of_onion: sha256_of_onion.clone(), + failure_code: failure_code.clone(), + }); + }, + &InboundHTLCRemovalReason::Fulfill(ref payment_preimage) => { + update_fulfill_htlcs.push(msgs::UpdateFulfillHTLC { + channel_id: self.channel_id(), + htlc_id: htlc.htlc_id, + payment_preimage: payment_preimage.clone(), + }); + }, + } + } + } + + msgs::CommitmentUpdate { + update_add_htlcs, update_fulfill_htlcs, update_fail_htlcs, update_fail_malformed_htlcs, + update_fee: None, //TODO: We need to support re-generating any update_fees in the last commitment_signed! + commitment_signed: self.send_commitment_no_state_update().expect("It looks like we failed to re-generate a commitment_signed we had previously sent?").0, + } + } + /// May panic if some calls other than message-handling calls (which will all Err immediately) /// have been called between remove_uncommitted_htlcs_and_mark_paused and this call. pub fn channel_reestablish(&mut self, msg: &msgs::ChannelReestablish) -> Result<(Option, Option, Option, Option, RAACommitmentOrder), ChannelError> { @@ -2106,13 +2278,12 @@ impl Channel { // Note that if we need to repeat our FundingLocked we'll do that in the next if block. None } else if msg.next_remote_commitment_number == (INITIAL_COMMITMENT_NUMBER - 1) - self.cur_local_commitment_transaction_number { - let next_per_commitment_point = PublicKey::from_secret_key(&self.secp_ctx, &self.build_local_commitment_secret(self.cur_local_commitment_transaction_number)); - let per_commitment_secret = chan_utils::build_commitment_secret(self.local_keys.commitment_seed, self.cur_local_commitment_transaction_number + 2); - Some(msgs::RevokeAndACK { - channel_id: self.channel_id, - per_commitment_secret, - next_per_commitment_point, - }) + if self.channel_state & (ChannelState::MonitorUpdateFailed as u32) != 0 { + self.monitor_pending_revoke_and_ack = true; + None + } else { + Some(self.get_last_revoke_and_ack()) + } } else { return Err(ChannelError::Close("Peer attempted to reestablish channel with a very old local commitment transaction")); }; @@ -2124,6 +2295,7 @@ impl Channel { let our_next_remote_commitment_number = INITIAL_COMMITMENT_NUMBER - self.cur_remote_commitment_transaction_number + if (self.channel_state & ChannelState::AwaitingRemoteRevoke as u32) != 0 { 1 } else { 0 }; let resend_funding_locked = if msg.next_local_commitment_number == 1 && INITIAL_COMMITMENT_NUMBER - self.cur_local_commitment_transaction_number == 1 { + // We should never have to worry about MonitorUpdateFailed resending FundingLocked let next_per_commitment_secret = self.build_local_commitment_secret(self.cur_local_commitment_transaction_number); let next_per_commitment_point = PublicKey::from_secret_key(&self.secp_ctx, &next_per_commitment_secret); Some(msgs::FundingLocked { @@ -2132,11 +2304,11 @@ impl Channel { }) } else { None }; - let order = if self.received_commitment_while_awaiting_raa { - RAACommitmentOrder::CommitmentFirst - } else { - RAACommitmentOrder::RevokeAndACKFirst - }; + let order = self.monitor_pending_order.clone().unwrap_or(if self.received_commitment_while_awaiting_raa { + RAACommitmentOrder::CommitmentFirst + } else { + RAACommitmentOrder::RevokeAndACKFirst + }); if msg.next_local_commitment_number == our_next_remote_commitment_number { if required_revoke.is_some() { @@ -2145,7 +2317,8 @@ impl Channel { log_debug!(self, "Reconnected channel {} with no loss", log_bytes!(self.channel_id())); } - if (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32)) == 0 { + if (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32 | ChannelState::MonitorUpdateFailed as u32)) == 0 && + self.monitor_pending_order.is_none() { // monitor_pending_order indicates we're waiting on a response to a unfreeze // We're up-to-date and not waiting on a remote revoke (if we are our // channel_reestablish should result in them sending a revoke_and_ack), but we may // have received some updates while we were disconnected. Free the holding cell @@ -2172,59 +2345,16 @@ impl Channel { } else { log_debug!(self, "Reconnected channel {} with only lost remote commitment tx", log_bytes!(self.channel_id())); } - let mut update_add_htlcs = Vec::new(); - let mut update_fulfill_htlcs = Vec::new(); - let mut update_fail_htlcs = Vec::new(); - let mut update_fail_malformed_htlcs = Vec::new(); - - for htlc in self.pending_outbound_htlcs.iter() { - if let &OutboundHTLCState::LocalAnnounced(ref onion_packet) = &htlc.state { - update_add_htlcs.push(msgs::UpdateAddHTLC { - channel_id: self.channel_id(), - htlc_id: htlc.htlc_id, - amount_msat: htlc.amount_msat, - payment_hash: htlc.payment_hash, - cltv_expiry: htlc.cltv_expiry, - onion_routing_packet: (**onion_packet).clone(), - }); - } - } - for htlc in self.pending_inbound_htlcs.iter() { - if let &InboundHTLCState::LocalRemoved(ref reason) = &htlc.state { - match reason { - &InboundHTLCRemovalReason::FailRelay(ref err_packet) => { - update_fail_htlcs.push(msgs::UpdateFailHTLC { - channel_id: self.channel_id(), - htlc_id: htlc.htlc_id, - reason: err_packet.clone() - }); - }, - &InboundHTLCRemovalReason::FailMalformed((ref sha256_of_onion, ref failure_code)) => { - update_fail_malformed_htlcs.push(msgs::UpdateFailMalformedHTLC { - channel_id: self.channel_id(), - htlc_id: htlc.htlc_id, - sha256_of_onion: sha256_of_onion.clone(), - failure_code: failure_code.clone(), - }); - }, - &InboundHTLCRemovalReason::Fulfill(ref payment_preimage) => { - update_fulfill_htlcs.push(msgs::UpdateFulfillHTLC { - channel_id: self.channel_id(), - htlc_id: htlc.htlc_id, - payment_preimage: payment_preimage.clone(), - }); - }, - } - } + // If monitor_pending_order is set, it must be CommitmentSigned if we have no RAA + debug_assert!(self.monitor_pending_order != Some(RAACommitmentOrder::RevokeAndACKFirst) || required_revoke.is_some()); + + if self.channel_state & (ChannelState::MonitorUpdateFailed as u32) != 0 { + self.monitor_pending_commitment_signed = true; + return Ok((resend_funding_locked, None, None, None, order)); } - return Ok((resend_funding_locked, required_revoke, - Some(msgs::CommitmentUpdate { - update_add_htlcs, update_fulfill_htlcs, update_fail_htlcs, update_fail_malformed_htlcs, - update_fee: None, //TODO: We need to support re-generating any update_fees in the last commitment_signed! - commitment_signed: self.send_commitment_no_state_update().expect("It looks like we failed to re-generate a commitment_signed we had previously sent?").0, - }), None, order)); + return Ok((resend_funding_locked, required_revoke, Some(self.get_last_commitment_update()), None, order)); } else { return Err(ChannelError::Close("Peer attempted to reestablish channel with a very old remote commitment transaction")); } @@ -2561,7 +2691,13 @@ impl Channel { /// is_usable() and considers things like the channel being temporarily disabled. /// Allowed in any state (including after shutdown) pub fn is_live(&self) -> bool { - self.is_usable() && (self.channel_state & (ChannelState::PeerDisconnected as u32) == 0) + self.is_usable() && (self.channel_state & (ChannelState::PeerDisconnected as u32 | ChannelState::MonitorUpdateFailed as u32) == 0) + } + + /// Returns true if this channel has been marked as awaiting a monitor update to move forward. + /// Allowed in any state (including after shutdown) + pub fn is_awaiting_monitor_update(&self) -> bool { + (self.channel_state & ChannelState::MonitorUpdateFailed as u32) != 0 } /// Returns true if funding_created was sent/received. @@ -2875,14 +3011,14 @@ impl Channel { return Err(HandleError{err: "Cannot send less than their minimum HTLC value", action: None}); } - if (self.channel_state & (ChannelState::PeerDisconnected as u32)) == (ChannelState::PeerDisconnected as u32) { + if (self.channel_state & (ChannelState::PeerDisconnected as u32 | ChannelState::MonitorUpdateFailed as u32)) != 0 { // Note that this should never really happen, if we're !is_live() on receipt of an // incoming HTLC for relay will result in us rejecting the HTLC and we won't allow // the user to send directly into a !is_live() channel. However, if we // disconnected during the time the previous hop was doing the commitment dance we may // end up getting here after the forwarding delay. In any case, returning an // IgnoreError will get ChannelManager to do the right thing and fail backwards now. - return Err(HandleError{err: "Cannot send an HTLC while disconnected", action: Some(ErrorAction::IgnoreError)}); + return Err(HandleError{err: "Cannot send an HTLC while disconnected/frozen for channel monitor update", action: Some(ErrorAction::IgnoreError)}); } let (outbound_htlc_count, htlc_outbound_value_msat) = self.get_outbound_pending_htlc_stats(); @@ -2964,6 +3100,9 @@ impl Channel { if (self.channel_state & (ChannelState::PeerDisconnected as u32)) == (ChannelState::PeerDisconnected as u32) { panic!("Cannot create commitment tx while disconnected, as send_htlc will have returned an Err so a send_commitment precondition has been violated"); } + if (self.channel_state & (ChannelState::MonitorUpdateFailed as u32)) == (ChannelState::PeerDisconnected as u32) { + panic!("Cannot create commitment tx while awaiting monitor update unfreeze, as send_htlc will have returned an Err so a send_commitment precondition has been violated"); + } let mut have_updates = self.pending_update_fee.is_some(); for htlc in self.pending_outbound_htlcs.iter() { if let OutboundHTLCState::LocalAnnounced(_) = htlc.state { @@ -3072,8 +3211,8 @@ impl Channel { } } assert_eq!(self.channel_state & ChannelState::ShutdownComplete as u32, 0); - if self.channel_state & (ChannelState::PeerDisconnected as u32) == ChannelState::PeerDisconnected as u32 { - return Err(APIError::ChannelUnavailable{err: "Cannot begin shutdown while peer is disconnected, maybe force-close instead?"}); + if self.channel_state & (ChannelState::PeerDisconnected as u32 | ChannelState::MonitorUpdateFailed as u32) != 0 { + return Err(APIError::ChannelUnavailable{err: "Cannot begin shutdown while peer is disconnected or we're waiting on a monitor update, maybe force-close instead?"}); } let our_closing_script = self.get_closing_scriptpubkey(); diff --git a/src/ln/channelmanager.rs b/src/ln/channelmanager.rs index 774fdf6a..d6ba67c2 100644 --- a/src/ln/channelmanager.rs +++ b/src/ln/channelmanager.rs @@ -23,7 +23,7 @@ use secp256k1; use chain::chaininterface::{BroadcasterInterface,ChainListener,ChainWatchInterface,FeeEstimator}; use chain::transaction::OutPoint; use ln::channel::{Channel, ChannelError, ChannelKeys}; -use ln::channelmonitor::{ManyChannelMonitor, CLTV_CLAIM_BUFFER, HTLC_FAIL_TIMEOUT_BLOCKS}; +use ln::channelmonitor::{ChannelMonitorUpdateErr, ManyChannelMonitor, CLTV_CLAIM_BUFFER, HTLC_FAIL_TIMEOUT_BLOCKS}; use ln::router::{Route,RouteHop}; use ln::msgs; use ln::msgs::{ChannelMessageHandler, HandleError, RAACommitmentOrder}; @@ -586,6 +586,33 @@ impl ChannelManager { } } + fn handle_monitor_update_fail(&self, mut channel_state_lock: MutexGuard, channel_id: &[u8; 32], err: ChannelMonitorUpdateErr, reason: RAACommitmentOrder) { + match err { + ChannelMonitorUpdateErr::PermanentFailure => { + let mut chan = { + let channel_state = channel_state_lock.borrow_parts(); + let chan = channel_state.by_id.remove(channel_id).expect("monitor_update_failed must be called within the same lock as the channel get!"); + if let Some(short_id) = chan.get_short_channel_id() { + channel_state.short_to_id.remove(&short_id); + } + chan + }; + mem::drop(channel_state_lock); + self.finish_force_close_channel(chan.force_shutdown()); + let mut events = self.pending_events.lock().unwrap(); + if let Ok(update) = self.get_channel_update(&chan) { + events.push(events::Event::BroadcastChannelUpdate { + msg: update + }); + } + }, + ChannelMonitorUpdateErr::TemporaryFailure => { + let channel = channel_state_lock.by_id.get_mut(channel_id).expect("monitor_update_failed must be called within the same lock as the channel get!"); + channel.monitor_update_failed(reason); + }, + } + } + #[inline] fn gen_rho_mu_from_shared_secret(shared_secret: &SharedSecret) -> ([u8; 32], [u8; 32]) { ({ @@ -984,6 +1011,11 @@ impl ChannelManager { if let Some((err, code, chan_update)) = loop { let chan = channel_state.as_mut().unwrap().by_id.get_mut(&forwarding_id).unwrap(); + // Note that we could technically not return an error yet here and just hope + // that the connection is reestablished or monitor updated by the time we get + // around to doing the actual forward, but better to fail early if we can and + // hopefully an attacker trying to path-trace payments cannot make this occur + // on a small/per-node/per-channel scale. if !chan.is_live() { // channel_disabled break Some(("Forwarding channel is not in a ready state.", 0x1000 | 20, Some(self.get_channel_update(chan).unwrap()))); } @@ -1027,6 +1059,7 @@ impl ChannelManager { } /// only fails if the channel does not yet have an assigned short_id + /// May be called with channel_state already locked! fn get_channel_update(&self, chan: &Channel) -> Result { let short_channel_id = match chan.get_short_channel_id() { None => return Err(HandleError{err: "Channel not yet established", action: None}), @@ -1096,9 +1129,8 @@ impl ChannelManager { let (onion_payloads, htlc_msat, htlc_cltv) = ChannelManager::build_onion_payloads(&route, cur_height)?; let onion_packet = ChannelManager::construct_onion_packet(onion_payloads, onion_keys, &payment_hash); - let (first_hop_node_id, (update_add, commitment_signed, chan_monitor)) = { - let mut channel_state_lock = self.channel_state.lock().unwrap(); - let channel_state = channel_state_lock.borrow_parts(); + let (first_hop_node_id, update_add, commitment_signed) = { + let mut channel_state = self.channel_state.lock().unwrap(); let id = match channel_state.short_to_id.get(&route.hops.first().unwrap().short_channel_id) { None => return Err(APIError::ChannelUnavailable{err: "No channel available with first hop!"}), @@ -1106,32 +1138,45 @@ impl ChannelManager { }; let res = { - let chan = channel_state.by_id.get_mut(&id).unwrap(); - if chan.get_their_node_id() != route.hops.first().unwrap().pubkey { - return Err(APIError::RouteError{err: "Node ID mismatch on first hop!"}); - } - if !chan.is_live() { - return Err(APIError::ChannelUnavailable{err: "Peer for first hop currently disconnected!"}); + let res = { + let chan = channel_state.by_id.get_mut(&id).unwrap(); + if chan.get_their_node_id() != route.hops.first().unwrap().pubkey { + return Err(APIError::RouteError{err: "Node ID mismatch on first hop!"}); + } + if chan.is_awaiting_monitor_update() { + return Err(APIError::MonitorUpdateFailed); + } + if !chan.is_live() { + return Err(APIError::ChannelUnavailable{err: "Peer for first hop currently disconnected!"}); + } + chan.send_htlc_and_commit(htlc_msat, payment_hash.clone(), htlc_cltv, HTLCSource::OutboundRoute { + route: route.clone(), + session_priv: session_priv.clone(), + first_hop_htlc_msat: htlc_msat, + }, onion_packet).map_err(|he| APIError::ChannelUnavailable{err: he.err})? + }; + match res { + Some((update_add, commitment_signed, chan_monitor)) => { + if let Err(e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) { + self.handle_monitor_update_fail(channel_state, &id, e, RAACommitmentOrder::CommitmentFirst); + return Err(APIError::MonitorUpdateFailed); + } + Some((update_add, commitment_signed)) + }, + None => None, } - chan.send_htlc_and_commit(htlc_msat, payment_hash.clone(), htlc_cltv, HTLCSource::OutboundRoute { - route: route.clone(), - session_priv: session_priv.clone(), - first_hop_htlc_msat: htlc_msat, - }, onion_packet).map_err(|he| APIError::ChannelUnavailable{err: he.err})? }; let first_hop_node_id = route.hops.first().unwrap().pubkey; match res { - Some(msgs) => (first_hop_node_id, msgs), + Some((update_add, commitment_signed)) => { + (first_hop_node_id, update_add, commitment_signed) + }, None => return Ok(()), } }; - if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) { - unimplemented!(); - } - let mut events = self.pending_events.lock().unwrap(); events.push(events::Event::UpdateHTLCs { node_id: first_hop_node_id, @@ -1184,7 +1229,9 @@ impl ChannelManager { }, None => return } - }; // Release channel lock for install_watch_outpoint call, + }; + // Because we have exclusive ownership of the channel here we can release the channel_state + // lock before add_update_monitor if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) { unimplemented!(); } @@ -1299,7 +1346,10 @@ impl ChannelManager { continue; }, }; - new_events.push((Some(monitor), events::Event::UpdateHTLCs { + if let Err(_e) = self.monitor.add_update_monitor(monitor.get_funding_txo().unwrap(), monitor) { + unimplemented!();// but def dont push the event... + } + new_events.push(events::Event::UpdateHTLCs { node_id: forward_chan.get_their_node_id(), updates: msgs::CommitmentUpdate { update_add_htlcs: add_htlc_msgs, @@ -1309,7 +1359,7 @@ impl ChannelManager { update_fee: None, commitment_signed: commitment_msg, }, - })); + }); } } else { for HTLCForwardInfo { prev_short_channel_id, prev_htlc_id, forward_info } in pending_forwards.drain(..) { @@ -1322,10 +1372,10 @@ impl ChannelManager { hash_map::Entry::Occupied(mut entry) => entry.get_mut().push(prev_hop_data), hash_map::Entry::Vacant(entry) => { entry.insert(vec![prev_hop_data]); }, }; - new_events.push((None, events::Event::PaymentReceived { + new_events.push(events::Event::PaymentReceived { payment_hash: forward_info.payment_hash, amt: forward_info.amt_to_forward, - })); + }); } } } @@ -1339,21 +1389,8 @@ impl ChannelManager { } if new_events.is_empty() { return } - - new_events.retain(|event| { - if let &Some(ref monitor) = &event.0 { - if let Err(_e) = self.monitor.add_update_monitor(monitor.get_funding_txo().unwrap(), monitor.clone()) { - unimplemented!();// but def dont push the event... - } - } - true - }); - let mut events = self.pending_events.lock().unwrap(); - events.reserve(new_events.len()); - for event in new_events.drain(..) { - events.push(event.1); - } + events.append(&mut new_events); } /// Indicates that the preimage for payment_hash is unknown after a PaymentReceived event. @@ -1416,7 +1453,13 @@ impl ChannelManager { let chan = channel_state.by_id.get_mut(&chan_id).unwrap(); match chan.get_update_fail_htlc_and_commit(htlc_id, err_packet) { - Ok(msg) => (chan.get_their_node_id(), msg), + Ok(Some((msg, commitment_msg, chan_monitor))) => { + if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) { + unimplemented!(); + } + (chan.get_their_node_id(), Some((msg, commitment_msg))) + }, + Ok(None) => (chan.get_their_node_id(), None), Err(_e) => { //TODO: Do something with e? return; @@ -1425,13 +1468,9 @@ impl ChannelManager { }; match fail_msgs { - Some((msg, commitment_msg, chan_monitor)) => { + Some((msg, commitment_msg)) => { mem::drop(channel_state); - if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) { - unimplemented!();// but def dont push the event... - } - let mut pending_events = self.pending_events.lock().unwrap(); pending_events.push(events::Event::UpdateHTLCs { node_id, @@ -1496,7 +1535,13 @@ impl ChannelManager { let chan = channel_state.by_id.get_mut(&chan_id).unwrap(); match chan.get_update_fulfill_htlc_and_commit(htlc_id, payment_preimage) { - Ok(msg) => (chan.get_their_node_id(), msg), + Ok((msgs, Some(chan_monitor))) => { + if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) { + unimplemented!();// but def dont push the event... + } + (chan.get_their_node_id(), msgs) + }, + Ok((msgs, None)) => (chan.get_their_node_id(), msgs), Err(_e) => { // TODO: There is probably a channel manager somewhere that needs to // learn the preimage as the channel may be about to hit the chain. @@ -1507,13 +1552,7 @@ impl ChannelManager { }; mem::drop(channel_state); - if let Some(chan_monitor) = fulfill_msgs.1 { - if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) { - unimplemented!();// but def dont push the event... - } - } - - if let Some((msg, commitment_msg)) = fulfill_msgs.0 { + if let Some((msg, commitment_msg)) = fulfill_msgs { let mut pending_events = self.pending_events.lock().unwrap(); pending_events.push(events::Event::UpdateHTLCs { node_id: node_id, @@ -1540,7 +1579,83 @@ impl ChannelManager { /// ChannelMonitorUpdateErr::TemporaryFailure was returned from a channel monitor update /// operation. pub fn test_restore_channel_monitor(&self) { - unimplemented!(); + let mut new_events = Vec::new(); + let mut close_results = Vec::new(); + let mut htlc_forwards = Vec::new(); + let mut htlc_failures = Vec::new(); + + { + let mut channel_lock = self.channel_state.lock().unwrap(); + let channel_state = channel_lock.borrow_parts(); + let short_to_id = channel_state.short_to_id; + channel_state.by_id.retain(|_, channel| { + if channel.is_awaiting_monitor_update() { + let chan_monitor = channel.channel_monitor(); + if let Err(e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) { + match e { + ChannelMonitorUpdateErr::PermanentFailure => { + if let Some(short_id) = channel.get_short_channel_id() { + short_to_id.remove(&short_id); + } + close_results.push(channel.force_shutdown()); + if let Ok(update) = self.get_channel_update(&channel) { + new_events.push(events::Event::BroadcastChannelUpdate { + msg: update + }); + } + false + }, + ChannelMonitorUpdateErr::TemporaryFailure => true, + } + } else { + let (raa, commitment_update, order, pending_forwards, mut pending_failures) = channel.monitor_updating_restored(); + if !pending_forwards.is_empty() { + htlc_forwards.push((channel.get_short_channel_id().expect("We can't have pending forwards before funding confirmation"), pending_forwards)); + } + htlc_failures.append(&mut pending_failures); + + macro_rules! handle_cs { () => { + if let Some(update) = commitment_update { + new_events.push(events::Event::UpdateHTLCs { + node_id: channel.get_their_node_id(), + updates: update, + }); + } + } } + macro_rules! handle_raa { () => { + if let Some(revoke_and_ack) = raa { + new_events.push(events::Event::SendRevokeAndACK { + node_id: channel.get_their_node_id(), + msg: revoke_and_ack, + }); + } + } } + match order { + RAACommitmentOrder::CommitmentFirst => { + handle_cs!(); + handle_raa!(); + }, + RAACommitmentOrder::RevokeAndACKFirst => { + handle_raa!(); + handle_cs!(); + }, + } + true + } + } else { true } + }); + } + + for failure in htlc_failures.drain(..) { + self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), failure.0, &failure.1, failure.2); + } + self.forward_htlcs(&mut htlc_forwards[..]); + + for res in close_results.drain(..) { + self.finish_force_close_channel(res); + } + + self.pending_events.lock().unwrap().append(&mut new_events); } fn internal_open_channel(&self, their_node_id: &PublicKey, msg: &msgs::OpenChannel) -> Result { @@ -1626,10 +1741,9 @@ impl ChannelManager { }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel", msg.temporary_channel_id)) } - }; // Release channel lock for install_watch_outpoint call, - // note that this means if the remote end is misbehaving and sends a message for the same - // channel back-to-back with funding_created, we'll end up thinking they sent a message - // for a bogus channel. + }; + // Because we have exclusive ownership of the channel here we can release the channel_state + // lock before add_update_monitor if let Err(_e) = self.monitor.add_update_monitor(monitor_update.get_funding_txo().unwrap(), monitor_update) { unimplemented!(); } @@ -1646,7 +1760,7 @@ impl ChannelManager { } fn internal_funding_signed(&self, their_node_id: &PublicKey, msg: &msgs::FundingSigned) -> Result<(), MsgHandleErrInternal> { - let (funding_txo, user_id, monitor) = { + let (funding_txo, user_id) = { let mut channel_state = self.channel_state.lock().unwrap(); match channel_state.by_id.get_mut(&msg.channel_id) { Some(chan) => { @@ -1655,14 +1769,14 @@ impl ChannelManager { return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!", msg.channel_id)); } let chan_monitor = chan.funding_signed(&msg).map_err(|e| MsgHandleErrInternal::from_maybe_close(e))?; - (chan.get_funding_txo().unwrap(), chan.get_user_id(), chan_monitor) + if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) { + unimplemented!(); + } + (chan.get_funding_txo().unwrap(), chan.get_user_id()) }, None => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel", msg.channel_id)) } }; - if let Err(_e) = self.monitor.add_update_monitor(monitor.get_funding_txo().unwrap(), monitor) { - unimplemented!(); - } let mut pending_events = self.pending_events.lock().unwrap(); pending_events.push(events::Event::FundingBroadcastSafe { funding_txo: funding_txo, @@ -2042,7 +2156,7 @@ impl ChannelManager { } fn internal_commitment_signed(&self, their_node_id: &PublicKey, msg: &msgs::CommitmentSigned) -> Result<(msgs::RevokeAndACK, Option), MsgHandleErrInternal> { - let (revoke_and_ack, commitment_signed, chan_monitor) = { + let (revoke_and_ack, commitment_signed) = { let mut channel_state = self.channel_state.lock().unwrap(); match channel_state.by_id.get_mut(&msg.channel_id) { Some(chan) => { @@ -2050,20 +2164,53 @@ impl ChannelManager { //TODO: here and below MsgHandleErrInternal, #153 case return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!", msg.channel_id)); } - chan.commitment_signed(&msg).map_err(|e| MsgHandleErrInternal::from_maybe_close(e))? + let (revoke_and_ack, commitment_signed, chan_monitor) = chan.commitment_signed(&msg).map_err(|e| MsgHandleErrInternal::from_maybe_close(e))?; + if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) { + unimplemented!(); + } + (revoke_and_ack, commitment_signed) }, None => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel", msg.channel_id)) } }; - if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) { - unimplemented!(); - } - Ok((revoke_and_ack, commitment_signed)) } + #[inline] + fn forward_htlcs(&self, per_source_pending_forwards: &mut [(u64, Vec<(PendingForwardHTLCInfo, u64)>)]) { + for &mut (prev_short_channel_id, ref mut pending_forwards) in per_source_pending_forwards { + let mut forward_event = None; + if !pending_forwards.is_empty() { + let mut channel_state = self.channel_state.lock().unwrap(); + if channel_state.forward_htlcs.is_empty() { + forward_event = Some(Instant::now() + Duration::from_millis(((rng::rand_f32() * 4.0 + 1.0) * MIN_HTLC_RELAY_HOLDING_CELL_MILLIS as f32) as u64)); + channel_state.next_forward = forward_event.unwrap(); + } + for (forward_info, prev_htlc_id) in pending_forwards.drain(..) { + match channel_state.forward_htlcs.entry(forward_info.short_channel_id) { + hash_map::Entry::Occupied(mut entry) => { + entry.get_mut().push(HTLCForwardInfo { prev_short_channel_id, prev_htlc_id, forward_info }); + }, + hash_map::Entry::Vacant(entry) => { + entry.insert(vec!(HTLCForwardInfo { prev_short_channel_id, prev_htlc_id, forward_info })); + } + } + } + } + match forward_event { + Some(time) => { + let mut pending_events = self.pending_events.lock().unwrap(); + pending_events.push(events::Event::PendingHTLCsForwardable { + time_forwardable: time + }); + } + None => {}, + } + } + } + fn internal_revoke_and_ack(&self, their_node_id: &PublicKey, msg: &msgs::RevokeAndACK) -> Result, MsgHandleErrInternal> { - let ((res, mut pending_forwards, mut pending_failures, chan_monitor), short_channel_id) = { + let ((res, pending_forwards, mut pending_failures), short_channel_id) = { let mut channel_state = self.channel_state.lock().unwrap(); match channel_state.by_id.get_mut(&msg.channel_id) { Some(chan) => { @@ -2071,45 +2218,19 @@ impl ChannelManager { //TODO: here and below MsgHandleErrInternal, #153 case return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!", msg.channel_id)); } - (chan.revoke_and_ack(&msg).map_err(|e| MsgHandleErrInternal::from_maybe_close(e))?, chan.get_short_channel_id().expect("RAA should only work on a short-id-available channel")) + let (res, pending_forwards, pending_failures, chan_monitor) = chan.revoke_and_ack(&msg).map_err(|e| MsgHandleErrInternal::from_maybe_close(e))?; + if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) { + unimplemented!(); + } + ((res, pending_forwards, pending_failures), chan.get_short_channel_id().expect("RAA should only work on a short-id-available channel")) }, None => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel", msg.channel_id)) } }; - if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) { - unimplemented!(); - } for failure in pending_failures.drain(..) { self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), failure.0, &failure.1, failure.2); } - - let mut forward_event = None; - if !pending_forwards.is_empty() { - let mut channel_state = self.channel_state.lock().unwrap(); - if channel_state.forward_htlcs.is_empty() { - forward_event = Some(Instant::now() + Duration::from_millis(((rng::rand_f32() * 4.0 + 1.0) * MIN_HTLC_RELAY_HOLDING_CELL_MILLIS as f32) as u64)); - channel_state.next_forward = forward_event.unwrap(); - } - for (forward_info, prev_htlc_id) in pending_forwards.drain(..) { - match channel_state.forward_htlcs.entry(forward_info.short_channel_id) { - hash_map::Entry::Occupied(mut entry) => { - entry.get_mut().push(HTLCForwardInfo { prev_short_channel_id: short_channel_id, prev_htlc_id, forward_info }); - }, - hash_map::Entry::Vacant(entry) => { - entry.insert(vec!(HTLCForwardInfo { prev_short_channel_id: short_channel_id, prev_htlc_id, forward_info })); - } - } - } - } - match forward_event { - Some(time) => { - let mut pending_events = self.pending_events.lock().unwrap(); - pending_events.push(events::Event::PendingHTLCsForwardable { - time_forwardable: time - }); - } - None => {}, - } + self.forward_htlcs(&mut [(short_channel_id, pending_forwards)]); Ok(res) } @@ -2169,7 +2290,7 @@ impl ChannelManager { } fn internal_channel_reestablish(&self, their_node_id: &PublicKey, msg: &msgs::ChannelReestablish) -> Result<(Option, Option, Option, RAACommitmentOrder), MsgHandleErrInternal> { - let (res, chan_monitor) = { + let res = { let mut channel_state = self.channel_state.lock().unwrap(); match channel_state.by_id.get_mut(&msg.channel_id) { Some(chan) => { @@ -2178,16 +2299,17 @@ impl ChannelManager { } let (funding_locked, revoke_and_ack, commitment_update, channel_monitor, order) = chan.channel_reestablish(msg) .map_err(|e| MsgHandleErrInternal::from_chan_maybe_close(e, msg.channel_id))?; - (Ok((funding_locked, revoke_and_ack, commitment_update, order)), channel_monitor) + if let Some(monitor) = channel_monitor { + if let Err(_e) = self.monitor.add_update_monitor(monitor.get_funding_txo().unwrap(), monitor) { + unimplemented!(); + } + } + Ok((funding_locked, revoke_and_ack, commitment_update, order)) }, None => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel", msg.channel_id)) } }; - if let Some(monitor) = chan_monitor { - if let Err(_e) = self.monitor.add_update_monitor(monitor.get_funding_txo().unwrap(), monitor) { - unimplemented!(); - } - } + res } @@ -2204,6 +2326,9 @@ impl ChannelManager { if !chan.is_outbound() { return Err(APIError::APIMisuseError{err: "update_fee cannot be sent for an inbound channel"}); } + if chan.is_awaiting_monitor_update() { + return Err(APIError::MonitorUpdateFailed); + } if !chan.is_live() { return Err(APIError::ChannelUnavailable{err: "Channel is either not yet fully established or peer is currently disconnected"}); } @@ -2553,7 +2678,7 @@ mod tests { use chain::transaction::OutPoint; use chain::chaininterface::ChainListener; use ln::channelmanager::{ChannelManager,OnionKeys}; - use ln::channelmonitor::{CLTV_CLAIM_BUFFER, HTLC_FAIL_TIMEOUT_BLOCKS}; + use ln::channelmonitor::{ChannelMonitorUpdateErr, CLTV_CLAIM_BUFFER, HTLC_FAIL_TIMEOUT_BLOCKS}; use ln::router::{Route, RouteHop, Router}; use ln::msgs; use ln::msgs::{ChannelMessageHandler,RoutingMessageHandler}; @@ -3018,15 +3143,17 @@ mod tests { commitment_msg: msgs::CommitmentSigned, } impl SendEvent { + fn from_commitment_update(node_id: PublicKey, updates: msgs::CommitmentUpdate) -> SendEvent { + assert!(updates.update_fulfill_htlcs.is_empty()); + assert!(updates.update_fail_htlcs.is_empty()); + assert!(updates.update_fail_malformed_htlcs.is_empty()); + assert!(updates.update_fee.is_none()); + SendEvent { node_id: node_id, msgs: updates.update_add_htlcs, commitment_msg: updates.commitment_signed } + } + fn from_event(event: Event) -> SendEvent { match event { - Event::UpdateHTLCs { node_id, updates: msgs::CommitmentUpdate { update_add_htlcs, update_fulfill_htlcs, update_fail_htlcs, update_fail_malformed_htlcs, update_fee, commitment_signed } } => { - assert!(update_fulfill_htlcs.is_empty()); - assert!(update_fail_htlcs.is_empty()); - assert!(update_fail_malformed_htlcs.is_empty()); - assert!(update_fee.is_none()); - SendEvent { node_id: node_id, msgs: update_add_htlcs, commitment_msg: commitment_signed } - }, + Event::UpdateHTLCs { node_id, updates } => SendEvent::from_commitment_update(node_id, updates), _ => panic!("Unexpected event type!"), } } @@ -4115,6 +4242,19 @@ mod tests { } } + macro_rules! expect_pending_htlcs_forwardable { + ($node: expr) => {{ + let events = $node.node.get_and_clear_pending_events(); + assert_eq!(events.len(), 1); + match events[0] { + Event::PendingHTLCsForwardable { .. } => { }, + _ => panic!("Unexpected event"), + }; + $node.node.channel_state.lock().unwrap().next_forward = Instant::now(); + $node.node.process_pending_htlc_forwards(); + }} + } + #[test] fn channel_reserve_test() { use util::rng; @@ -4147,19 +4287,6 @@ mod tests { }} }; - macro_rules! expect_pending_htlcs_forwardable { - ($node: expr) => {{ - let events = $node.node.get_and_clear_pending_events(); - assert_eq!(events.len(), 1); - match events[0] { - Event::PendingHTLCsForwardable { .. } => { }, - _ => panic!("Unexpected event"), - }; - $node.node.channel_state.lock().unwrap().next_forward = Instant::now(); - $node.node.process_pending_htlc_forwards(); - }} - }; - macro_rules! expect_forward { ($node: expr) => {{ let mut events = $node.node.get_and_clear_pending_events(); @@ -5453,6 +5580,447 @@ mod tests { claim_payment(&nodes[0], &[&nodes[1]], payment_preimage_2); } + #[test] + fn test_simple_monitor_permanent_update_fail() { + // Test that we handle a simple permanent monitor update failure + let mut nodes = create_network(2); + create_announced_chan_between_nodes(&nodes, 0, 1); + + let route = nodes[0].router.get_route(&nodes[1].node.get_our_node_id(), None, &Vec::new(), 1000000, TEST_FINAL_CLTV).unwrap(); + let (_, payment_hash_1) = get_payment_preimage_hash!(nodes[0]); + + *nodes[0].chan_monitor.update_ret.lock().unwrap() = Err(ChannelMonitorUpdateErr::PermanentFailure); + if let Err(APIError::MonitorUpdateFailed) = nodes[0].node.send_payment(route, payment_hash_1) {} else { panic!(); } + check_added_monitors!(nodes[0], 1); + + let events_1 = nodes[0].node.get_and_clear_pending_events(); + assert_eq!(events_1.len(), 1); + match events_1[0] { + Event::BroadcastChannelUpdate { .. } => {}, + _ => panic!("Unexpected event"), + }; + + // TODO: Once we hit the chain with the failure transaction we should check that we get a + // PaymentFailed event + + assert_eq!(nodes[0].node.list_channels().len(), 0); + } + + fn do_test_simple_monitor_temporary_update_fail(disconnect: bool) { + // Test that we can recover from a simple temporary monitor update failure optionally with + // a disconnect in between + let mut nodes = create_network(2); + create_announced_chan_between_nodes(&nodes, 0, 1); + + let route = nodes[0].router.get_route(&nodes[1].node.get_our_node_id(), None, &Vec::new(), 1000000, TEST_FINAL_CLTV).unwrap(); + let (payment_preimage_1, payment_hash_1) = get_payment_preimage_hash!(nodes[0]); + + *nodes[0].chan_monitor.update_ret.lock().unwrap() = Err(ChannelMonitorUpdateErr::TemporaryFailure); + if let Err(APIError::MonitorUpdateFailed) = nodes[0].node.send_payment(route.clone(), payment_hash_1) {} else { panic!(); } + check_added_monitors!(nodes[0], 1); + + let events_1 = nodes[0].node.get_and_clear_pending_events(); + assert!(events_1.is_empty()); + assert_eq!(nodes[0].node.list_channels().len(), 1); + + if disconnect { + nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id(), false); + nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false); + reconnect_nodes(&nodes[0], &nodes[1], true, (0, 0), (0, 0), (0, 0), (0, 0), (false, false)); + } + + *nodes[0].chan_monitor.update_ret.lock().unwrap() = Ok(()); + nodes[0].node.test_restore_channel_monitor(); + check_added_monitors!(nodes[0], 1); + + let mut events_2 = nodes[0].node.get_and_clear_pending_events(); + assert_eq!(events_2.len(), 1); + let payment_event = SendEvent::from_event(events_2.pop().unwrap()); + assert_eq!(payment_event.node_id, nodes[1].node.get_our_node_id()); + nodes[1].node.handle_update_add_htlc(&nodes[0].node.get_our_node_id(), &payment_event.msgs[0]).unwrap(); + commitment_signed_dance!(nodes[1], nodes[0], payment_event.commitment_msg, false); + + expect_pending_htlcs_forwardable!(nodes[1]); + + let events_3 = nodes[1].node.get_and_clear_pending_events(); + assert_eq!(events_3.len(), 1); + match events_3[0] { + Event::PaymentReceived { ref payment_hash, amt } => { + assert_eq!(payment_hash_1, *payment_hash); + assert_eq!(amt, 1000000); + }, + _ => panic!("Unexpected event"), + } + + claim_payment(&nodes[0], &[&nodes[1]], payment_preimage_1); + + // Now set it to failed again... + let (_, payment_hash_2) = get_payment_preimage_hash!(nodes[0]); + *nodes[0].chan_monitor.update_ret.lock().unwrap() = Err(ChannelMonitorUpdateErr::TemporaryFailure); + if let Err(APIError::MonitorUpdateFailed) = nodes[0].node.send_payment(route, payment_hash_2) {} else { panic!(); } + check_added_monitors!(nodes[0], 1); + + let events_4 = nodes[0].node.get_and_clear_pending_events(); + assert!(events_4.is_empty()); + assert_eq!(nodes[0].node.list_channels().len(), 1); + + if disconnect { + nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id(), false); + nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false); + reconnect_nodes(&nodes[0], &nodes[1], false, (0, 0), (0, 0), (0, 0), (0, 0), (false, false)); + } + + // ...and make sure we can force-close a TemporaryFailure channel with a PermanentFailure + *nodes[0].chan_monitor.update_ret.lock().unwrap() = Err(ChannelMonitorUpdateErr::PermanentFailure); + nodes[0].node.test_restore_channel_monitor(); + check_added_monitors!(nodes[0], 1); + + let events_5 = nodes[0].node.get_and_clear_pending_events(); + assert_eq!(events_5.len(), 1); + match events_5[0] { + Event::BroadcastChannelUpdate { .. } => {}, + _ => panic!("Unexpected event"), + } + + // TODO: Once we hit the chain with the failure transaction we should check that we get a + // PaymentFailed event + + assert_eq!(nodes[0].node.list_channels().len(), 0); + } + + #[test] + fn test_simple_monitor_temporary_update_fail() { + do_test_simple_monitor_temporary_update_fail(false); + do_test_simple_monitor_temporary_update_fail(true); + } + + fn do_test_monitor_temporary_update_fail(disconnect_count: usize) { + let disconnect_flags = 8 | 16; + + // Test that we can recover from a temporary monitor update failure with some in-flight + // HTLCs going on at the same time potentially with some disconnection thrown in. + // * First we route a payment, then get a temporary monitor update failure when trying to + // route a second payment. We then claim the first payment. + // * If disconnect_count is set, we will disconnect at this point (which is likely as + // TemporaryFailure likely indicates net disconnect which resulted in failing to update + // the ChannelMonitor on a watchtower). + // * If !(disconnect_count & 16) we deliver a update_fulfill_htlc/CS for the first payment + // immediately, otherwise we wait sconnect and deliver them via the reconnect + // channel_reestablish processing (ie disconnect_count & 16 makes no sense if + // disconnect_count & !disconnect_flags is 0). + // * We then update the channel monitor, reconnecting if disconnect_count is set and walk + // through message sending, potentially disconnect/reconnecting multiple times based on + // disconnect_count, to get the update_fulfill_htlc through. + // * We then walk through more message exchanges to get the original update_add_htlc + // through, swapping message ordering based on disconnect_count & 8 and optionally + // disconnect/reconnecting based on disconnect_count. + let mut nodes = create_network(2); + create_announced_chan_between_nodes(&nodes, 0, 1); + + let (payment_preimage_1, _) = route_payment(&nodes[0], &[&nodes[1]], 1000000); + + // Now try to send a second payment which will fail to send + let route = nodes[0].router.get_route(&nodes[1].node.get_our_node_id(), None, &Vec::new(), 1000000, TEST_FINAL_CLTV).unwrap(); + let (payment_preimage_2, payment_hash_2) = get_payment_preimage_hash!(nodes[0]); + + *nodes[0].chan_monitor.update_ret.lock().unwrap() = Err(ChannelMonitorUpdateErr::TemporaryFailure); + if let Err(APIError::MonitorUpdateFailed) = nodes[0].node.send_payment(route.clone(), payment_hash_2) {} else { panic!(); } + check_added_monitors!(nodes[0], 1); + + let events_1 = nodes[0].node.get_and_clear_pending_events(); + assert!(events_1.is_empty()); + assert_eq!(nodes[0].node.list_channels().len(), 1); + + // Claim the previous payment, which will result in a update_fulfill_htlc/CS from nodes[1] + // but nodes[0] won't respond since it is frozen. + assert!(nodes[1].node.claim_funds(payment_preimage_1)); + check_added_monitors!(nodes[1], 1); + let events_2 = nodes[1].node.get_and_clear_pending_events(); + assert_eq!(events_2.len(), 1); + let (bs_initial_fulfill, bs_initial_commitment_signed) = match events_2[0] { + Event::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => { + assert_eq!(*node_id, nodes[0].node.get_our_node_id()); + assert!(update_add_htlcs.is_empty()); + assert_eq!(update_fulfill_htlcs.len(), 1); + assert!(update_fail_htlcs.is_empty()); + assert!(update_fail_malformed_htlcs.is_empty()); + assert!(update_fee.is_none()); + + if (disconnect_count & 16) == 0 { + nodes[0].node.handle_update_fulfill_htlc(&nodes[1].node.get_our_node_id(), &update_fulfill_htlcs[0]).unwrap(); + let events_3 = nodes[0].node.get_and_clear_pending_events(); + assert_eq!(events_3.len(), 1); + match events_3[0] { + Event::PaymentSent { ref payment_preimage } => { + assert_eq!(*payment_preimage, payment_preimage_1); + }, + _ => panic!("Unexpected event"), + } + + if let Err(msgs::HandleError{err, action: Some(msgs::ErrorAction::IgnoreError) }) = nodes[0].node.handle_commitment_signed(&nodes[1].node.get_our_node_id(), commitment_signed) { + assert_eq!(err, "Previous monitor update failure prevented generation of RAA"); + } else { panic!(); } + } + + (update_fulfill_htlcs[0].clone(), commitment_signed.clone()) + }, + _ => panic!("Unexpected event"), + }; + + if disconnect_count & !disconnect_flags > 0 { + nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id(), false); + nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false); + } + + // Now fix monitor updating... + *nodes[0].chan_monitor.update_ret.lock().unwrap() = Ok(()); + nodes[0].node.test_restore_channel_monitor(); + check_added_monitors!(nodes[0], 1); + + macro_rules! disconnect_reconnect_peers { () => { { + nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id(), false); + nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false); + + let reestablish_1 = nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id()); + assert_eq!(reestablish_1.len(), 1); + let reestablish_2 = nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id()); + assert_eq!(reestablish_2.len(), 1); + + let as_resp = nodes[0].node.handle_channel_reestablish(&nodes[1].node.get_our_node_id(), &reestablish_2[0]).unwrap(); + let bs_resp = nodes[1].node.handle_channel_reestablish(&nodes[0].node.get_our_node_id(), &reestablish_1[0]).unwrap(); + + assert!(as_resp.0.is_none()); + assert!(bs_resp.0.is_none()); + + (reestablish_1, reestablish_2, as_resp, bs_resp) + } } } + + let (payment_event, initial_revoke_and_ack) = if disconnect_count & !disconnect_flags > 0 { + let events_4 = nodes[0].node.get_and_clear_pending_events(); + assert!(events_4.is_empty()); + + let reestablish_1 = nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id()); + assert_eq!(reestablish_1.len(), 1); + let reestablish_2 = nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id()); + assert_eq!(reestablish_2.len(), 1); + + let mut as_resp = nodes[0].node.handle_channel_reestablish(&nodes[1].node.get_our_node_id(), &reestablish_2[0]).unwrap(); + check_added_monitors!(nodes[0], 0); + let mut bs_resp = nodes[1].node.handle_channel_reestablish(&nodes[0].node.get_our_node_id(), &reestablish_1[0]).unwrap(); + check_added_monitors!(nodes[1], 0); + + assert!(as_resp.0.is_none()); + assert!(bs_resp.0.is_none()); + + assert!(bs_resp.1.is_none()); + if (disconnect_count & 16) == 0 { + assert!(bs_resp.2.is_none()); + + assert!(as_resp.1.is_some()); + assert!(as_resp.2.is_some()); + assert!(as_resp.3 == msgs::RAACommitmentOrder::CommitmentFirst); + } else { + assert!(bs_resp.2.as_ref().unwrap().update_add_htlcs.is_empty()); + assert!(bs_resp.2.as_ref().unwrap().update_fail_htlcs.is_empty()); + assert!(bs_resp.2.as_ref().unwrap().update_fail_malformed_htlcs.is_empty()); + assert!(bs_resp.2.as_ref().unwrap().update_fee.is_none()); + assert!(bs_resp.2.as_ref().unwrap().update_fulfill_htlcs == vec![bs_initial_fulfill]); + assert!(bs_resp.2.as_ref().unwrap().commitment_signed == bs_initial_commitment_signed); + + assert!(as_resp.1.is_none()); + + nodes[0].node.handle_update_fulfill_htlc(&nodes[1].node.get_our_node_id(), &bs_resp.2.as_ref().unwrap().update_fulfill_htlcs[0]).unwrap(); + let events_3 = nodes[0].node.get_and_clear_pending_events(); + assert_eq!(events_3.len(), 1); + match events_3[0] { + Event::PaymentSent { ref payment_preimage } => { + assert_eq!(*payment_preimage, payment_preimage_1); + }, + _ => panic!("Unexpected event"), + } + + let (as_resp_raa, as_resp_cu) = nodes[0].node.handle_commitment_signed(&nodes[1].node.get_our_node_id(), &bs_resp.2.as_ref().unwrap().commitment_signed).unwrap(); + assert!(as_resp_cu.is_none()); + check_added_monitors!(nodes[0], 1); + + as_resp.1 = Some(as_resp_raa); + bs_resp.2 = None; + } + + if disconnect_count & !disconnect_flags > 1 { + let (second_reestablish_1, second_reestablish_2, second_as_resp, second_bs_resp) = disconnect_reconnect_peers!(); + + if (disconnect_count & 16) == 0 { + assert!(reestablish_1 == second_reestablish_1); + assert!(reestablish_2 == second_reestablish_2); + } + assert!(as_resp == second_as_resp); + assert!(bs_resp == second_bs_resp); + } + + (SendEvent::from_commitment_update(nodes[1].node.get_our_node_id(), as_resp.2.unwrap()), as_resp.1.unwrap()) + } else { + let mut events_4 = nodes[0].node.get_and_clear_pending_events(); + assert_eq!(events_4.len(), 2); + (SendEvent::from_event(events_4.remove(0)), match events_4[0] { + Event::SendRevokeAndACK { ref node_id, ref msg } => { + assert_eq!(*node_id, nodes[1].node.get_our_node_id()); + msg.clone() + }, + _ => panic!("Unexpected event"), + }) + }; + + assert_eq!(payment_event.node_id, nodes[1].node.get_our_node_id()); + + nodes[1].node.handle_update_add_htlc(&nodes[0].node.get_our_node_id(), &payment_event.msgs[0]).unwrap(); + let (bs_revoke_and_ack, bs_commitment_signed) = nodes[1].node.handle_commitment_signed(&nodes[0].node.get_our_node_id(), &payment_event.commitment_msg).unwrap(); + assert!(bs_commitment_signed.is_none()); // nodes[1] is awaiting an RAA from nodes[0] still + check_added_monitors!(nodes[1], 1); + + if disconnect_count & !disconnect_flags > 2 { + let (_, _, as_resp, bs_resp) = disconnect_reconnect_peers!(); + + assert!(as_resp.1.unwrap() == initial_revoke_and_ack); + assert!(bs_resp.1.unwrap() == bs_revoke_and_ack); + + assert!(as_resp.2.is_none()); + assert!(bs_resp.2.is_none()); + } + + let as_commitment_update; + let bs_second_commitment_update; + + macro_rules! handle_bs_raa { () => { + as_commitment_update = nodes[0].node.handle_revoke_and_ack(&nodes[1].node.get_our_node_id(), &bs_revoke_and_ack).unwrap().unwrap(); + assert!(as_commitment_update.update_add_htlcs.is_empty()); + assert!(as_commitment_update.update_fulfill_htlcs.is_empty()); + assert!(as_commitment_update.update_fail_htlcs.is_empty()); + assert!(as_commitment_update.update_fail_malformed_htlcs.is_empty()); + assert!(as_commitment_update.update_fee.is_none()); + check_added_monitors!(nodes[0], 1); + } } + + macro_rules! handle_initial_raa { () => { + bs_second_commitment_update = nodes[1].node.handle_revoke_and_ack(&nodes[0].node.get_our_node_id(), &initial_revoke_and_ack).unwrap().unwrap(); + assert!(bs_second_commitment_update.update_add_htlcs.is_empty()); + assert!(bs_second_commitment_update.update_fulfill_htlcs.is_empty()); + assert!(bs_second_commitment_update.update_fail_htlcs.is_empty()); + assert!(bs_second_commitment_update.update_fail_malformed_htlcs.is_empty()); + assert!(bs_second_commitment_update.update_fee.is_none()); + check_added_monitors!(nodes[1], 1); + } } + + if (disconnect_count & 8) == 0 { + handle_bs_raa!(); + + if disconnect_count & !disconnect_flags > 3 { + let (_, _, as_resp, bs_resp) = disconnect_reconnect_peers!(); + + assert!(as_resp.1.unwrap() == initial_revoke_and_ack); + assert!(bs_resp.1.is_none()); + + assert!(as_resp.2.unwrap() == as_commitment_update); + assert!(bs_resp.2.is_none()); + + assert!(as_resp.3 == msgs::RAACommitmentOrder::RevokeAndACKFirst); + } + + handle_initial_raa!(); + + if disconnect_count & !disconnect_flags > 4 { + let (_, _, as_resp, bs_resp) = disconnect_reconnect_peers!(); + + assert!(as_resp.1.is_none()); + assert!(bs_resp.1.is_none()); + + assert!(as_resp.2.unwrap() == as_commitment_update); + assert!(bs_resp.2.unwrap() == bs_second_commitment_update); + } + } else { + handle_initial_raa!(); + + if disconnect_count & !disconnect_flags > 3 { + let (_, _, as_resp, bs_resp) = disconnect_reconnect_peers!(); + + assert!(as_resp.1.is_none()); + assert!(bs_resp.1.unwrap() == bs_revoke_and_ack); + + assert!(as_resp.2.is_none()); + assert!(bs_resp.2.unwrap() == bs_second_commitment_update); + + assert!(bs_resp.3 == msgs::RAACommitmentOrder::RevokeAndACKFirst); + } + + handle_bs_raa!(); + + if disconnect_count & !disconnect_flags > 4 { + let (_, _, as_resp, bs_resp) = disconnect_reconnect_peers!(); + + assert!(as_resp.1.is_none()); + assert!(bs_resp.1.is_none()); + + assert!(as_resp.2.unwrap() == as_commitment_update); + assert!(bs_resp.2.unwrap() == bs_second_commitment_update); + } + } + + let (as_revoke_and_ack, as_commitment_signed) = nodes[0].node.handle_commitment_signed(&nodes[1].node.get_our_node_id(), &bs_second_commitment_update.commitment_signed).unwrap(); + assert!(as_commitment_signed.is_none()); + check_added_monitors!(nodes[0], 1); + + let (bs_second_revoke_and_ack, bs_third_commitment_signed) = nodes[1].node.handle_commitment_signed(&nodes[0].node.get_our_node_id(), &as_commitment_update.commitment_signed).unwrap(); + assert!(bs_third_commitment_signed.is_none()); + check_added_monitors!(nodes[1], 1); + + assert!(nodes[1].node.handle_revoke_and_ack(&nodes[0].node.get_our_node_id(), &as_revoke_and_ack).unwrap().is_none()); + check_added_monitors!(nodes[1], 1); + + assert!(nodes[0].node.handle_revoke_and_ack(&nodes[1].node.get_our_node_id(), &bs_second_revoke_and_ack).unwrap().is_none()); + check_added_monitors!(nodes[0], 1); + + expect_pending_htlcs_forwardable!(nodes[1]); + + let events_5 = nodes[1].node.get_and_clear_pending_events(); + assert_eq!(events_5.len(), 1); + match events_5[0] { + Event::PaymentReceived { ref payment_hash, amt } => { + assert_eq!(payment_hash_2, *payment_hash); + assert_eq!(amt, 1000000); + }, + _ => panic!("Unexpected event"), + } + + claim_payment(&nodes[0], &[&nodes[1]], payment_preimage_2); + } + + #[test] + fn test_monitor_temporary_update_fail_a() { + do_test_monitor_temporary_update_fail(0); + do_test_monitor_temporary_update_fail(1); + do_test_monitor_temporary_update_fail(2); + do_test_monitor_temporary_update_fail(3); + do_test_monitor_temporary_update_fail(4); + do_test_monitor_temporary_update_fail(5); + } + + #[test] + fn test_monitor_temporary_update_fail_b() { + do_test_monitor_temporary_update_fail(2 | 8); + do_test_monitor_temporary_update_fail(3 | 8); + do_test_monitor_temporary_update_fail(4 | 8); + do_test_monitor_temporary_update_fail(5 | 8); + } + + #[test] + fn test_monitor_temporary_update_fail_c() { + do_test_monitor_temporary_update_fail(1 | 16); + do_test_monitor_temporary_update_fail(2 | 16); + do_test_monitor_temporary_update_fail(3 | 16); + do_test_monitor_temporary_update_fail(2 | 8 | 16); + do_test_monitor_temporary_update_fail(3 | 8 | 16); + } + #[test] fn test_invalid_channel_announcement() { //Test BOLT 7 channel_announcement msg requirement for final node, gather data to build customed channel_announcement msgs diff --git a/src/ln/channelmonitor.rs b/src/ln/channelmonitor.rs index 59ed177b..60cb9c91 100644 --- a/src/ln/channelmonitor.rs +++ b/src/ln/channelmonitor.rs @@ -39,6 +39,7 @@ use std::sync::{Arc,Mutex}; use std::{hash,cmp}; /// An error enum representing a failure to persist a channel monitor update. +#[derive(Clone)] pub enum ChannelMonitorUpdateErr { /// Used to indicate a temporary failure (eg connection to a watchtower failed, but is expected /// to succeed at some point in the future). @@ -47,6 +48,22 @@ pub enum ChannelMonitorUpdateErr { /// submitting new commitment transactions to the remote party. /// ChannelManager::test_restore_channel_monitor can be used to retry the update(s) and restore /// the channel to an operational state. + /// + /// Note that continuing to operate when no copy of the updated ChannelMonitor could be + /// persisted is unsafe - if you failed to store the update on your own local disk you should + /// instead return PermanentFailure to force closure of the channel ASAP. + /// + /// Even when a channel has been "frozen" updates to the ChannelMonitor can continue to occur + /// (eg if an inbound HTLC which we forwarded was claimed upstream resulting in us attempting + /// to claim it on this channel) and those updates must be applied wherever they can be. At + /// least one such updated ChannelMonitor must be persisted otherwise PermanentFailure should + /// be returned to get things on-chain ASAP using only the in-memory copy. Obviously updates to + /// the channel which would invalidate previous ChannelMonitors are not made when a channel has + /// been "frozen". + /// + /// Note that even if updates made after TemporaryFailure succeed you must still call + /// test_restore_channel_monitor to ensure you have the latest monitor and re-enable normal + /// channel operation. TemporaryFailure, /// Used to indicate no further channel monitor updates will be allowed (eg we've moved on to a /// different watchtower and cannot update with all watchtowers that were previously informed diff --git a/src/ln/msgs.rs b/src/ln/msgs.rs index bab2674e..b5db51cf 100644 --- a/src/ln/msgs.rs +++ b/src/ln/msgs.rs @@ -224,7 +224,7 @@ pub struct FundingSigned { } /// A funding_locked message to be sent or received from a peer -#[derive(Clone)] +#[derive(Clone, PartialEq)] pub struct FundingLocked { pub(crate) channel_id: [u8; 32], pub(crate) next_per_commitment_point: PublicKey, @@ -244,7 +244,7 @@ pub struct ClosingSigned { } /// An update_add_htlc message to be sent or received from a peer -#[derive(Clone)] +#[derive(Clone, PartialEq)] pub struct UpdateAddHTLC { pub(crate) channel_id: [u8; 32], pub(crate) htlc_id: u64, @@ -255,7 +255,7 @@ pub struct UpdateAddHTLC { } /// An update_fulfill_htlc message to be sent or received from a peer -#[derive(Clone)] +#[derive(Clone, PartialEq)] pub struct UpdateFulfillHTLC { pub(crate) channel_id: [u8; 32], pub(crate) htlc_id: u64, @@ -263,7 +263,7 @@ pub struct UpdateFulfillHTLC { } /// An update_fail_htlc message to be sent or received from a peer -#[derive(Clone)] +#[derive(Clone, PartialEq)] pub struct UpdateFailHTLC { pub(crate) channel_id: [u8; 32], pub(crate) htlc_id: u64, @@ -271,7 +271,7 @@ pub struct UpdateFailHTLC { } /// An update_fail_malformed_htlc message to be sent or received from a peer -#[derive(Clone)] +#[derive(Clone, PartialEq)] pub struct UpdateFailMalformedHTLC { pub(crate) channel_id: [u8; 32], pub(crate) htlc_id: u64, @@ -280,7 +280,7 @@ pub struct UpdateFailMalformedHTLC { } /// A commitment_signed message to be sent or received from a peer -#[derive(Clone)] +#[derive(Clone, PartialEq)] pub struct CommitmentSigned { pub(crate) channel_id: [u8; 32], pub(crate) signature: Signature, @@ -288,6 +288,7 @@ pub struct CommitmentSigned { } /// A revoke_and_ack message to be sent or received from a peer +#[derive(Clone, PartialEq)] pub struct RevokeAndACK { pub(crate) channel_id: [u8; 32], pub(crate) per_commitment_secret: [u8; 32], @@ -295,17 +296,20 @@ pub struct RevokeAndACK { } /// An update_fee message to be sent or received from a peer +#[derive(PartialEq)] pub struct UpdateFee { pub(crate) channel_id: [u8; 32], pub(crate) feerate_per_kw: u32, } +#[derive(PartialEq)] pub(crate) struct DataLossProtect { pub(crate) your_last_per_commitment_secret: [u8; 32], pub(crate) my_current_per_commitment_point: PublicKey, } /// A channel_reestablish message to be sent or received from a peer +#[derive(PartialEq)] pub struct ChannelReestablish { pub(crate) channel_id: [u8; 32], pub(crate) next_local_commitment_number: u64, @@ -463,6 +467,7 @@ pub struct HandleError { //TODO: rename me /// Struct used to return values from revoke_and_ack messages, containing a bunch of commitment /// transaction updates if they were pending. +#[derive(PartialEq)] pub struct CommitmentUpdate { pub(crate) update_add_htlcs: Vec, pub(crate) update_fulfill_htlcs: Vec, @@ -629,7 +634,18 @@ pub(crate) struct OnionPacket { pub(crate) hmac: [u8; 32], } -#[derive(Clone)] +impl PartialEq for OnionPacket { + fn eq(&self, other: &OnionPacket) -> bool { + for (i, j) in self.hop_data.iter().zip(other.hop_data.iter()) { + if i != j { return false; } + } + self.version == other.version && + self.public_key == other.public_key && + self.hmac == other.hmac + } +} + +#[derive(Clone, PartialEq)] pub(crate) struct OnionErrorPacket { // This really should be a constant size slice, but the spec lets these things be up to 128KB? // (TODO) We limit it in decode to much lower... diff --git a/src/ln/peer_handler.rs b/src/ln/peer_handler.rs index b629e7fa..94cdef9e 100644 --- a/src/ln/peer_handler.rs +++ b/src/ln/peer_handler.rs @@ -866,6 +866,17 @@ impl PeerManager { Self::do_attempt_write_data(&mut descriptor, peer); continue; }, + Event::SendRevokeAndACK { ref node_id, ref msg } => { + log_trace!(self, "Handling SendRevokeAndACK event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + log_bytes!(msg.channel_id)); + let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, { + //TODO: Do whatever we're gonna do for handling dropped messages + }); + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 133))); + Self::do_attempt_write_data(&mut descriptor, peer); + continue; + }, Event::SendShutdown { ref node_id, ref msg } => { log_trace!(self, "Handling Shutdown event in peer_handler for node {} for channel {}", log_pubkey!(node_id), diff --git a/src/util/errors.rs b/src/util/errors.rs index 9446b358..66ddf84d 100644 --- a/src/util/errors.rs +++ b/src/util/errors.rs @@ -32,7 +32,10 @@ pub enum APIError { ChannelUnavailable { /// A human-readable error message err: &'static str - } + }, + /// An attempt to call add_update_monitor returned an Err (ie you did this!), causing the + /// attempted action to fail. + MonitorUpdateFailed, } impl fmt::Debug for APIError { @@ -42,6 +45,7 @@ impl fmt::Debug for APIError { APIError::FeeRateTooHigh {ref err, ref feerate} => write!(f, "{} feerate: {}", err, feerate), APIError::RouteError {ref err} => f.write_str(err), APIError::ChannelUnavailable {ref err} => f.write_str(err), + APIError::MonitorUpdateFailed => f.write_str("Client indicated a channel monitor update failed"), } } } diff --git a/src/util/events.rs b/src/util/events.rs index 51417c63..e11e4e82 100644 --- a/src/util/events.rs +++ b/src/util/events.rs @@ -129,6 +129,15 @@ pub enum Event { /// The update messages which should be sent. ALL messages in the struct should be sent! updates: msgs::CommitmentUpdate, }, + /// Used to indicate that a revoke_and_ack message should be sent to the peer with the given node_id. + /// + /// This event is handled by PeerManager::process_events if you are using a PeerManager. + SendRevokeAndACK { + /// The node_id of the node which should receive this message + node_id: PublicKey, + /// The message which should be sent. + msg: msgs::RevokeAndACK, + }, /// Used to indicate that a shutdown message should be sent to the peer with the given node_id. /// /// This event is handled by PeerManager::process_events if you are using a PeerManager. diff --git a/src/util/test_utils.rs b/src/util/test_utils.rs index 8ab02b26..2577bc9f 100644 --- a/src/util/test_utils.rs +++ b/src/util/test_utils.rs @@ -38,12 +38,14 @@ impl chaininterface::FeeEstimator for TestFeeEstimator { pub struct TestChannelMonitor { pub added_monitors: Mutex>, pub simple_monitor: Arc>, + pub update_ret: Mutex>, } impl TestChannelMonitor { pub fn new(chain_monitor: Arc, broadcaster: Arc) -> Self { Self { added_monitors: Mutex::new(Vec::new()), simple_monitor: channelmonitor::SimpleManyChannelMonitor::new(chain_monitor, broadcaster), + update_ret: Mutex::new(Ok(())), } } } @@ -57,7 +59,8 @@ impl channelmonitor::ManyChannelMonitor for TestChannelMonitor { w.0.clear(); monitor.write_for_watchtower(&mut w).unwrap(); // This at least shouldn't crash... self.added_monitors.lock().unwrap().push((funding_txo, monitor.clone())); - self.simple_monitor.add_update_monitor(funding_txo, monitor) + assert!(self.simple_monitor.add_update_monitor(funding_txo, monitor).is_ok()); + self.update_ret.lock().unwrap().clone() } }