Merge pull request #213 from TheBlueMatt/2018-10-monitor-fail-pause
authorMatt Corallo <649246+TheBlueMatt@users.noreply.github.com>
Tue, 23 Oct 2018 20:45:50 +0000 (16:45 -0400)
committerGitHub <noreply@github.com>
Tue, 23 Oct 2018 20:45:50 +0000 (16:45 -0400)
Add  ChannelManager support for monitor update failure in one place

src/ln/channel.rs
src/ln/channelmanager.rs
src/ln/channelmonitor.rs
src/ln/msgs.rs
src/ln/peer_handler.rs
src/util/errors.rs
src/util/events.rs
src/util/test_utils.rs

index 992807493e8d78b05b5649423df602055f96f332..6b1511387d61d0048c8e8b1b29b81378df81c1b3 100644 (file)
@@ -248,28 +248,32 @@ enum ChannelState {
        /// "disconnected" and no updates are allowed until after we've done a channel_reestablish
        /// dance.
        PeerDisconnected = (1 << 7),
+       /// Flag which is set on ChannelFunded and FundingSent indicating the user has told us they
+       /// failed to update our ChannelMonitor somewhere and we should pause sending any outbound
+       /// messages until they've managed to do so.
+       MonitorUpdateFailed = (1 << 8),
        /// Flag which implies that we have sent a commitment_signed but are awaiting the responding
        /// revoke_and_ack message. During this time period, we can't generate new commitment_signed
        /// messages as then we will be unable to determine which HTLCs they included in their
        /// revoke_and_ack implicit ACK, so instead we have to hold them away temporarily to be sent
        /// later.
        /// Flag is set on ChannelFunded.
-       AwaitingRemoteRevoke = (1 << 8),
+       AwaitingRemoteRevoke = (1 << 9),
        /// Flag which is set on ChannelFunded or FundingSent after receiving a shutdown message from
        /// the remote end. If set, they may not add any new HTLCs to the channel, and we are expected
        /// to respond with our own shutdown message when possible.
-       RemoteShutdownSent = (1 << 9),
+       RemoteShutdownSent = (1 << 10),
        /// Flag which is set on ChannelFunded or FundingSent after sending a shutdown message. At this
        /// point, we may not add any new HTLCs to the channel.
        /// TODO: Investigate some kind of timeout mechanism by which point the remote end must provide
        /// us their shutdown.
-       LocalShutdownSent = (1 << 10),
+       LocalShutdownSent = (1 << 11),
        /// We've successfully negotiated a closing_signed dance. At this point ChannelManager is about
        /// to drop us, but we store this anyway.
-       ShutdownComplete = 2048,
+       ShutdownComplete = 4096,
 }
 const BOTH_SIDES_SHUTDOWN_MASK: u32 = (ChannelState::LocalShutdownSent as u32 | ChannelState::RemoteShutdownSent as u32);
-const MULTI_STATE_FLAGS: u32 = (BOTH_SIDES_SHUTDOWN_MASK | ChannelState::PeerDisconnected as u32);
+const MULTI_STATE_FLAGS: u32 = (BOTH_SIDES_SHUTDOWN_MASK | ChannelState::PeerDisconnected as u32 | ChannelState::MonitorUpdateFailed as u32);
 
 const INITIAL_COMMITMENT_NUMBER: u64 = (1 << 48) - 1;
 
@@ -306,6 +310,12 @@ pub(super) struct Channel {
        pending_outbound_htlcs: Vec<OutboundHTLCOutput>,
        holding_cell_htlc_updates: Vec<HTLCUpdateAwaitingACK>,
 
+       monitor_pending_revoke_and_ack: bool,
+       monitor_pending_commitment_signed: bool,
+       monitor_pending_order: Option<RAACommitmentOrder>,
+       monitor_pending_forwards: Vec<(PendingForwardHTLCInfo, u64)>,
+       monitor_pending_failures: Vec<(HTLCSource, [u8; 32], HTLCFailReason)>,
+
        // pending_update_fee is filled when sending and receiving update_fee
        // For outbound channel, feerate_per_kw is updated with the value from
        // pending_update_fee when revoke_and_ack is received
@@ -509,6 +519,12 @@ impl Channel {
                        next_remote_htlc_id: 0,
                        channel_update_count: 1,
 
+                       monitor_pending_revoke_and_ack: false,
+                       monitor_pending_commitment_signed: false,
+                       monitor_pending_order: None,
+                       monitor_pending_forwards: Vec::new(),
+                       monitor_pending_failures: Vec::new(),
+
                        #[cfg(debug_assertions)]
                        max_commitment_tx_output_local: ::std::sync::Mutex::new((channel_value_satoshis * 1000 - push_msat, push_msat)),
                        #[cfg(debug_assertions)]
@@ -666,6 +682,12 @@ impl Channel {
                        next_remote_htlc_id: 0,
                        channel_update_count: 1,
 
+                       monitor_pending_revoke_and_ack: false,
+                       monitor_pending_commitment_signed: false,
+                       monitor_pending_order: None,
+                       monitor_pending_forwards: Vec::new(),
+                       monitor_pending_failures: Vec::new(),
+
                        #[cfg(debug_assertions)]
                        max_commitment_tx_output_local: ::std::sync::Mutex::new((msg.push_msat, msg.funding_satoshis * 1000 - msg.push_msat)),
                        #[cfg(debug_assertions)]
@@ -1166,7 +1188,7 @@ impl Channel {
                // can claim it even if the channel hits the chain before we see their next commitment.
                self.channel_monitor.provide_payment_preimage(&payment_hash_calc, &payment_preimage_arg);
 
-               if (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32 | ChannelState::PeerDisconnected as u32)) != 0 {
+               if (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32 | ChannelState::PeerDisconnected as u32 | ChannelState::MonitorUpdateFailed as u32)) != 0 {
                        for pending_update in self.holding_cell_htlc_updates.iter() {
                                match pending_update {
                                        &HTLCUpdateAwaitingACK::ClaimHTLC { htlc_id, .. } => {
@@ -1243,7 +1265,7 @@ impl Channel {
                }
 
                // Now update local state:
-               if (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32 | ChannelState::PeerDisconnected as u32)) != 0 {
+               if (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32 | ChannelState::PeerDisconnected as u32 | ChannelState::MonitorUpdateFailed as u32)) != 0 {
                        for pending_update in self.holding_cell_htlc_updates.iter() {
                                match pending_update {
                                        &HTLCUpdateAwaitingACK::ClaimHTLC { htlc_id, .. } => {
@@ -1461,11 +1483,13 @@ impl Channel {
                if self.channel_state & (ChannelState::PeerDisconnected as u32) == ChannelState::PeerDisconnected as u32 {
                        return Err(ChannelError::Close("Peer sent funding_locked when we needed a channel_reestablish"));
                }
-               let non_shutdown_state = self.channel_state & (!BOTH_SIDES_SHUTDOWN_MASK);
+
+               let non_shutdown_state = self.channel_state & (!MULTI_STATE_FLAGS);
+
                if non_shutdown_state == ChannelState::FundingSent as u32 {
                        self.channel_state |= ChannelState::TheirFundingLocked as u32;
                } else if non_shutdown_state == (ChannelState::FundingSent as u32 | ChannelState::OurFundingLocked as u32) {
-                       self.channel_state = ChannelState::ChannelFunded as u32 | (self.channel_state & BOTH_SIDES_SHUTDOWN_MASK);
+                       self.channel_state = ChannelState::ChannelFunded as u32 | (self.channel_state & MULTI_STATE_FLAGS);
                        self.channel_update_count += 1;
                } else if self.channel_state & (ChannelState::ChannelFunded as u32) != 0 &&
                                // Note that funding_signed/funding_created will have decremented both by 1!
@@ -1685,6 +1709,11 @@ impl Channel {
                                }
                        }
                }
+               if self.channel_state & (ChannelState::MonitorUpdateFailed as u32) == 0 {
+                       // This is a response to our post-monitor-failed unfreeze messages, so we can clear the
+                       // monitor_pending_order requirement as we won't re-send the monitor_pending messages.
+                       self.monitor_pending_order = None;
+               }
 
                self.channel_monitor.provide_latest_local_commitment_tx_info(local_commitment_tx.0, local_keys, self.feerate_per_kw, htlcs_and_sigs);
 
@@ -1708,6 +1737,12 @@ impl Channel {
                self.last_local_commitment_txn = new_local_commitment_txn;
                self.received_commitment_while_awaiting_raa = (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32)) != 0;
 
+               if (self.channel_state & ChannelState::MonitorUpdateFailed as u32) != 0 {
+                       self.monitor_pending_revoke_and_ack = true;
+                       self.monitor_pending_commitment_signed |= need_our_commitment;
+                       return Err(HandleError{err: "Previous monitor update failure prevented generation of RAA", action: Some(ErrorAction::IgnoreError)});
+               }
+
                let (our_commitment_signed, monitor_update) = if need_our_commitment && (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32)) == 0 {
                        // If we're AwaitingRemoteRevoke we can't send a new commitment here, but that's ok -
                        // we'll send one right away when we get the revoke_and_ack when we
@@ -1726,6 +1761,7 @@ impl Channel {
        /// Used to fulfill holding_cell_htlcs when we get a remote ack (or implicitly get it by them
        /// fulfilling or failing the last pending HTLC)
        fn free_holding_cell_htlcs(&mut self) -> Result<Option<(msgs::CommitmentUpdate, ChannelMonitor)>, HandleError> {
+               assert_eq!(self.channel_state & ChannelState::MonitorUpdateFailed as u32, 0);
                if self.holding_cell_htlc_updates.len() != 0 || self.holding_cell_update_fee.is_some() {
                        let mut htlc_updates = Vec::new();
                        mem::swap(&mut htlc_updates, &mut self.holding_cell_htlc_updates);
@@ -1827,6 +1863,7 @@ impl Channel {
                if self.channel_state & (ChannelState::PeerDisconnected as u32) == ChannelState::PeerDisconnected as u32 {
                        return Err(HandleError{err: "Peer sent revoke_and_ack when we needed a channel_reestablish", action: Some(msgs::ErrorAction::SendErrorMessage{msg: msgs::ErrorMessage{data: "Peer sent revoke_and_ack when we needed a channel_reestablish".to_string(), channel_id: msg.channel_id}})});
                }
+
                if let Some(their_prev_commitment_point) = self.their_prev_commitment_point {
                        if PublicKey::from_secret_key(&self.secp_ctx, &secp_call!(SecretKey::from_slice(&self.secp_ctx, &msg.per_commitment_secret), "Peer provided an invalid per_commitment_secret", self.channel_id())) != their_prev_commitment_point {
                                return Err(HandleError{err: "Got a revoke commitment secret which didn't correspond to their current pubkey", action: None});
@@ -1843,6 +1880,11 @@ impl Channel {
                self.their_cur_commitment_point = Some(msg.next_per_commitment_point);
                self.cur_remote_commitment_transaction_number -= 1;
                self.received_commitment_while_awaiting_raa = false;
+               if self.channel_state & (ChannelState::MonitorUpdateFailed as u32) == 0 {
+                       // This is a response to our post-monitor-failed unfreeze messages, so we can clear the
+                       // monitor_pending_order requirement as we won't re-send the monitor_pending messages.
+                       self.monitor_pending_order = None;
+               }
 
                let mut to_forward_infos = Vec::new();
                let mut revoked_htlcs = Vec::new();
@@ -1934,6 +1976,17 @@ impl Channel {
                        }
                }
 
+               if (self.channel_state & ChannelState::MonitorUpdateFailed as u32) == ChannelState::MonitorUpdateFailed as u32 {
+                       // We can't actually generate a new commitment transaction (incl by freeing holding
+                       // cells) while we can't update the monitor, so we just return what we have.
+                       if require_commitment {
+                               self.monitor_pending_commitment_signed = true;
+                       }
+                       self.monitor_pending_forwards.append(&mut to_forward_infos);
+                       self.monitor_pending_failures.append(&mut revoked_htlcs);
+                       return Ok((None, Vec::new(), Vec::new(), self.channel_monitor.clone()));
+               }
+
                match self.free_holding_cell_htlcs()? {
                        Some(mut commitment_update) => {
                                commitment_update.0.update_fail_htlcs.reserve(update_fail_htlcs.len());
@@ -1976,7 +2029,7 @@ impl Channel {
                        panic!("Cannot update fee until channel is fully established and we haven't started shutting down");
                }
                if !self.is_live() {
-                       panic!("Cannot update fee while peer is disconnected (ChannelManager should have caught this)");
+                       panic!("Cannot update fee while peer is disconnected/we're awaiting a monitor update (ChannelManager should have caught this)");
                }
 
                if (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32)) == (ChannelState::AwaitingRemoteRevoke as u32) {
@@ -2068,6 +2121,60 @@ impl Channel {
                outbound_drops
        }
 
+       /// Indicates that a ChannelMonitor update failed to be stored by the client and further
+       /// updates are partially paused.
+       /// This must be called immediately after the call which generated the ChannelMonitor update
+       /// which failed, with the order argument set to the type of call it represented (ie a
+       /// commitment update or a revoke_and_ack generation). The messages which were generated from
+       /// that original call must *not* have been sent to the remote end, and must instead have been
+       /// dropped. They will be regenerated when monitor_updating_restored is called.
+       pub fn monitor_update_failed(&mut self, order: RAACommitmentOrder) {
+               assert_eq!(self.channel_state & ChannelState::MonitorUpdateFailed as u32, 0);
+               match order {
+                       RAACommitmentOrder::CommitmentFirst => {
+                               self.monitor_pending_revoke_and_ack = false;
+                               self.monitor_pending_commitment_signed = true;
+                       },
+                       RAACommitmentOrder::RevokeAndACKFirst => {
+                               self.monitor_pending_revoke_and_ack = true;
+                               self.monitor_pending_commitment_signed = false;
+                       },
+               }
+               self.monitor_pending_order = Some(order);
+               self.channel_state |= ChannelState::MonitorUpdateFailed as u32;
+       }
+
+       /// Indicates that the latest ChannelMonitor update has been committed by the client
+       /// successfully and we should restore normal operation. Returns messages which should be sent
+       /// to the remote side.
+       pub fn monitor_updating_restored(&mut self) -> (Option<msgs::RevokeAndACK>, Option<msgs::CommitmentUpdate>, RAACommitmentOrder, Vec<(PendingForwardHTLCInfo, u64)>, Vec<(HTLCSource, [u8; 32], HTLCFailReason)>) {
+               assert_eq!(self.channel_state & ChannelState::MonitorUpdateFailed as u32, ChannelState::MonitorUpdateFailed as u32);
+               self.channel_state &= !(ChannelState::MonitorUpdateFailed as u32);
+
+               let mut forwards = Vec::new();
+               mem::swap(&mut forwards, &mut self.monitor_pending_forwards);
+               let mut failures = Vec::new();
+               mem::swap(&mut failures, &mut self.monitor_pending_failures);
+
+               if self.channel_state & (ChannelState::PeerDisconnected as u32) != 0 {
+                       // Leave monitor_pending_order so we can order our channel_reestablish responses
+                       self.monitor_pending_revoke_and_ack = false;
+                       self.monitor_pending_commitment_signed = false;
+                       return (None, None, RAACommitmentOrder::RevokeAndACKFirst, forwards, failures);
+               }
+
+               let raa = if self.monitor_pending_revoke_and_ack {
+                       Some(self.get_last_revoke_and_ack())
+               } else { None };
+               let commitment_update = if self.monitor_pending_commitment_signed {
+                       Some(self.get_last_commitment_update())
+               } else { None };
+
+               self.monitor_pending_revoke_and_ack = false;
+               self.monitor_pending_commitment_signed = false;
+               (raa, commitment_update, self.monitor_pending_order.clone().unwrap(), forwards, failures)
+       }
+
        pub fn update_fee(&mut self, fee_estimator: &FeeEstimator, msg: &msgs::UpdateFee) -> Result<(), ChannelError> {
                if self.channel_outbound {
                        return Err(ChannelError::Close("Non-funding remote tried to update channel fee"));
@@ -2082,6 +2189,71 @@ impl Channel {
                Ok(())
        }
 
+       fn get_last_revoke_and_ack(&self) -> msgs::RevokeAndACK {
+               let next_per_commitment_point = PublicKey::from_secret_key(&self.secp_ctx, &self.build_local_commitment_secret(self.cur_local_commitment_transaction_number));
+               let per_commitment_secret = chan_utils::build_commitment_secret(self.local_keys.commitment_seed, self.cur_local_commitment_transaction_number + 2);
+               msgs::RevokeAndACK {
+                       channel_id: self.channel_id,
+                       per_commitment_secret,
+                       next_per_commitment_point,
+               }
+       }
+
+       fn get_last_commitment_update(&self) -> msgs::CommitmentUpdate {
+               let mut update_add_htlcs = Vec::new();
+               let mut update_fulfill_htlcs = Vec::new();
+               let mut update_fail_htlcs = Vec::new();
+               let mut update_fail_malformed_htlcs = Vec::new();
+
+               for htlc in self.pending_outbound_htlcs.iter() {
+                       if let &OutboundHTLCState::LocalAnnounced(ref onion_packet) = &htlc.state {
+                               update_add_htlcs.push(msgs::UpdateAddHTLC {
+                                       channel_id: self.channel_id(),
+                                       htlc_id: htlc.htlc_id,
+                                       amount_msat: htlc.amount_msat,
+                                       payment_hash: htlc.payment_hash,
+                                       cltv_expiry: htlc.cltv_expiry,
+                                       onion_routing_packet: (**onion_packet).clone(),
+                               });
+                       }
+               }
+
+               for htlc in self.pending_inbound_htlcs.iter() {
+                       if let &InboundHTLCState::LocalRemoved(ref reason) = &htlc.state {
+                               match reason {
+                                       &InboundHTLCRemovalReason::FailRelay(ref err_packet) => {
+                                               update_fail_htlcs.push(msgs::UpdateFailHTLC {
+                                                       channel_id: self.channel_id(),
+                                                       htlc_id: htlc.htlc_id,
+                                                       reason: err_packet.clone()
+                                               });
+                                       },
+                                       &InboundHTLCRemovalReason::FailMalformed((ref sha256_of_onion, ref failure_code)) => {
+                                               update_fail_malformed_htlcs.push(msgs::UpdateFailMalformedHTLC {
+                                                       channel_id: self.channel_id(),
+                                                       htlc_id: htlc.htlc_id,
+                                                       sha256_of_onion: sha256_of_onion.clone(),
+                                                       failure_code: failure_code.clone(),
+                                               });
+                                       },
+                                       &InboundHTLCRemovalReason::Fulfill(ref payment_preimage) => {
+                                               update_fulfill_htlcs.push(msgs::UpdateFulfillHTLC {
+                                                       channel_id: self.channel_id(),
+                                                       htlc_id: htlc.htlc_id,
+                                                       payment_preimage: payment_preimage.clone(),
+                                               });
+                                       },
+                               }
+                       }
+               }
+
+               msgs::CommitmentUpdate {
+                       update_add_htlcs, update_fulfill_htlcs, update_fail_htlcs, update_fail_malformed_htlcs,
+                       update_fee: None, //TODO: We need to support re-generating any update_fees in the last commitment_signed!
+                       commitment_signed: self.send_commitment_no_state_update().expect("It looks like we failed to re-generate a commitment_signed we had previously sent?").0,
+               }
+       }
+
        /// May panic if some calls other than message-handling calls (which will all Err immediately)
        /// have been called between remove_uncommitted_htlcs_and_mark_paused and this call.
        pub fn channel_reestablish(&mut self, msg: &msgs::ChannelReestablish) -> Result<(Option<msgs::FundingLocked>, Option<msgs::RevokeAndACK>, Option<msgs::CommitmentUpdate>, Option<ChannelMonitor>, RAACommitmentOrder), ChannelError> {
@@ -2106,13 +2278,12 @@ impl Channel {
                        // Note that if we need to repeat our FundingLocked we'll do that in the next if block.
                        None
                } else if msg.next_remote_commitment_number == (INITIAL_COMMITMENT_NUMBER - 1) - self.cur_local_commitment_transaction_number {
-                       let next_per_commitment_point = PublicKey::from_secret_key(&self.secp_ctx, &self.build_local_commitment_secret(self.cur_local_commitment_transaction_number));
-                       let per_commitment_secret = chan_utils::build_commitment_secret(self.local_keys.commitment_seed, self.cur_local_commitment_transaction_number + 2);
-                       Some(msgs::RevokeAndACK {
-                               channel_id: self.channel_id,
-                               per_commitment_secret,
-                               next_per_commitment_point,
-                       })
+                       if self.channel_state & (ChannelState::MonitorUpdateFailed as u32) != 0 {
+                               self.monitor_pending_revoke_and_ack = true;
+                               None
+                       } else {
+                               Some(self.get_last_revoke_and_ack())
+                       }
                } else {
                        return Err(ChannelError::Close("Peer attempted to reestablish channel with a very old local commitment transaction"));
                };
@@ -2124,6 +2295,7 @@ impl Channel {
                let our_next_remote_commitment_number = INITIAL_COMMITMENT_NUMBER - self.cur_remote_commitment_transaction_number + if (self.channel_state & ChannelState::AwaitingRemoteRevoke as u32) != 0 { 1 } else { 0 };
 
                let resend_funding_locked = if msg.next_local_commitment_number == 1 && INITIAL_COMMITMENT_NUMBER - self.cur_local_commitment_transaction_number == 1 {
+                       // We should never have to worry about MonitorUpdateFailed resending FundingLocked
                        let next_per_commitment_secret = self.build_local_commitment_secret(self.cur_local_commitment_transaction_number);
                        let next_per_commitment_point = PublicKey::from_secret_key(&self.secp_ctx, &next_per_commitment_secret);
                        Some(msgs::FundingLocked {
@@ -2132,11 +2304,11 @@ impl Channel {
                        })
                } else { None };
 
-               let order = if self.received_commitment_while_awaiting_raa {
-                       RAACommitmentOrder::CommitmentFirst
-               } else {
-                       RAACommitmentOrder::RevokeAndACKFirst
-               };
+               let order = self.monitor_pending_order.clone().unwrap_or(if self.received_commitment_while_awaiting_raa {
+                               RAACommitmentOrder::CommitmentFirst
+                       } else {
+                               RAACommitmentOrder::RevokeAndACKFirst
+                       });
 
                if msg.next_local_commitment_number == our_next_remote_commitment_number {
                        if required_revoke.is_some() {
@@ -2145,7 +2317,8 @@ impl Channel {
                                log_debug!(self, "Reconnected channel {} with no loss", log_bytes!(self.channel_id()));
                        }
 
-                       if (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32)) == 0 {
+                       if (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32 | ChannelState::MonitorUpdateFailed as u32)) == 0 &&
+                                       self.monitor_pending_order.is_none() { // monitor_pending_order indicates we're waiting on a response to a unfreeze
                                // We're up-to-date and not waiting on a remote revoke (if we are our
                                // channel_reestablish should result in them sending a revoke_and_ack), but we may
                                // have received some updates while we were disconnected. Free the holding cell
@@ -2172,59 +2345,16 @@ impl Channel {
                        } else {
                                log_debug!(self, "Reconnected channel {} with only lost remote commitment tx", log_bytes!(self.channel_id()));
                        }
-                       let mut update_add_htlcs = Vec::new();
-                       let mut update_fulfill_htlcs = Vec::new();
-                       let mut update_fail_htlcs = Vec::new();
-                       let mut update_fail_malformed_htlcs = Vec::new();
-
-                       for htlc in self.pending_outbound_htlcs.iter() {
-                               if let &OutboundHTLCState::LocalAnnounced(ref onion_packet) = &htlc.state {
-                                       update_add_htlcs.push(msgs::UpdateAddHTLC {
-                                               channel_id: self.channel_id(),
-                                               htlc_id: htlc.htlc_id,
-                                               amount_msat: htlc.amount_msat,
-                                               payment_hash: htlc.payment_hash,
-                                               cltv_expiry: htlc.cltv_expiry,
-                                               onion_routing_packet: (**onion_packet).clone(),
-                                       });
-                               }
-                       }
 
-                       for htlc in self.pending_inbound_htlcs.iter() {
-                               if let &InboundHTLCState::LocalRemoved(ref reason) = &htlc.state {
-                                       match reason {
-                                               &InboundHTLCRemovalReason::FailRelay(ref err_packet) => {
-                                                       update_fail_htlcs.push(msgs::UpdateFailHTLC {
-                                                               channel_id: self.channel_id(),
-                                                               htlc_id: htlc.htlc_id,
-                                                               reason: err_packet.clone()
-                                                       });
-                                               },
-                                               &InboundHTLCRemovalReason::FailMalformed((ref sha256_of_onion, ref failure_code)) => {
-                                                       update_fail_malformed_htlcs.push(msgs::UpdateFailMalformedHTLC {
-                                                               channel_id: self.channel_id(),
-                                                               htlc_id: htlc.htlc_id,
-                                                               sha256_of_onion: sha256_of_onion.clone(),
-                                                               failure_code: failure_code.clone(),
-                                                       });
-                                               },
-                                               &InboundHTLCRemovalReason::Fulfill(ref payment_preimage) => {
-                                                       update_fulfill_htlcs.push(msgs::UpdateFulfillHTLC {
-                                                               channel_id: self.channel_id(),
-                                                               htlc_id: htlc.htlc_id,
-                                                               payment_preimage: payment_preimage.clone(),
-                                                       });
-                                               },
-                                       }
-                               }
+                       // If monitor_pending_order is set, it must be CommitmentSigned if we have no RAA
+                       debug_assert!(self.monitor_pending_order != Some(RAACommitmentOrder::RevokeAndACKFirst) || required_revoke.is_some());
+
+                       if self.channel_state & (ChannelState::MonitorUpdateFailed as u32) != 0 {
+                               self.monitor_pending_commitment_signed = true;
+                               return Ok((resend_funding_locked, None, None, None, order));
                        }
 
-                       return Ok((resend_funding_locked, required_revoke,
-                                       Some(msgs::CommitmentUpdate {
-                                               update_add_htlcs, update_fulfill_htlcs, update_fail_htlcs, update_fail_malformed_htlcs,
-                                               update_fee: None, //TODO: We need to support re-generating any update_fees in the last commitment_signed!
-                                               commitment_signed: self.send_commitment_no_state_update().expect("It looks like we failed to re-generate a commitment_signed we had previously sent?").0,
-                                       }), None, order));
+                       return Ok((resend_funding_locked, required_revoke, Some(self.get_last_commitment_update()), None, order));
                } else {
                        return Err(ChannelError::Close("Peer attempted to reestablish channel with a very old remote commitment transaction"));
                }
@@ -2561,7 +2691,13 @@ impl Channel {
        /// is_usable() and considers things like the channel being temporarily disabled.
        /// Allowed in any state (including after shutdown)
        pub fn is_live(&self) -> bool {
-               self.is_usable() && (self.channel_state & (ChannelState::PeerDisconnected as u32) == 0)
+               self.is_usable() && (self.channel_state & (ChannelState::PeerDisconnected as u32 | ChannelState::MonitorUpdateFailed as u32) == 0)
+       }
+
+       /// Returns true if this channel has been marked as awaiting a monitor update to move forward.
+       /// Allowed in any state (including after shutdown)
+       pub fn is_awaiting_monitor_update(&self) -> bool {
+               (self.channel_state & ChannelState::MonitorUpdateFailed as u32) != 0
        }
 
        /// Returns true if funding_created was sent/received.
@@ -2875,14 +3011,14 @@ impl Channel {
                        return Err(HandleError{err: "Cannot send less than their minimum HTLC value", action: None});
                }
 
-               if (self.channel_state & (ChannelState::PeerDisconnected as u32)) == (ChannelState::PeerDisconnected as u32) {
+               if (self.channel_state & (ChannelState::PeerDisconnected as u32 | ChannelState::MonitorUpdateFailed as u32)) != 0 {
                        // Note that this should never really happen, if we're !is_live() on receipt of an
                        // incoming HTLC for relay will result in us rejecting the HTLC and we won't allow
                        // the user to send directly into a !is_live() channel. However, if we
                        // disconnected during the time the previous hop was doing the commitment dance we may
                        // end up getting here after the forwarding delay. In any case, returning an
                        // IgnoreError will get ChannelManager to do the right thing and fail backwards now.
-                       return Err(HandleError{err: "Cannot send an HTLC while disconnected", action: Some(ErrorAction::IgnoreError)});
+                       return Err(HandleError{err: "Cannot send an HTLC while disconnected/frozen for channel monitor update", action: Some(ErrorAction::IgnoreError)});
                }
 
                let (outbound_htlc_count, htlc_outbound_value_msat) = self.get_outbound_pending_htlc_stats();
@@ -2964,6 +3100,9 @@ impl Channel {
                if (self.channel_state & (ChannelState::PeerDisconnected as u32)) == (ChannelState::PeerDisconnected as u32) {
                        panic!("Cannot create commitment tx while disconnected, as send_htlc will have returned an Err so a send_commitment precondition has been violated");
                }
+               if (self.channel_state & (ChannelState::MonitorUpdateFailed as u32)) == (ChannelState::PeerDisconnected as u32) {
+                       panic!("Cannot create commitment tx while awaiting monitor update unfreeze, as send_htlc will have returned an Err so a send_commitment precondition has been violated");
+               }
                let mut have_updates = self.pending_update_fee.is_some();
                for htlc in self.pending_outbound_htlcs.iter() {
                        if let OutboundHTLCState::LocalAnnounced(_) = htlc.state {
@@ -3072,8 +3211,8 @@ impl Channel {
                        }
                }
                assert_eq!(self.channel_state & ChannelState::ShutdownComplete as u32, 0);
-               if self.channel_state & (ChannelState::PeerDisconnected as u32) == ChannelState::PeerDisconnected as u32 {
-                       return Err(APIError::ChannelUnavailable{err: "Cannot begin shutdown while peer is disconnected, maybe force-close instead?"});
+               if self.channel_state & (ChannelState::PeerDisconnected as u32 | ChannelState::MonitorUpdateFailed as u32) != 0 {
+                       return Err(APIError::ChannelUnavailable{err: "Cannot begin shutdown while peer is disconnected or we're waiting on a monitor update, maybe force-close instead?"});
                }
 
                let our_closing_script = self.get_closing_scriptpubkey();
index 774fdf6aa6ae72339814f1cb5d3bae1bfe4c9c47..d6ba67c2b36dcf24d23e3c880ccf64ddcb5a7b19 100644 (file)
@@ -23,7 +23,7 @@ use secp256k1;
 use chain::chaininterface::{BroadcasterInterface,ChainListener,ChainWatchInterface,FeeEstimator};
 use chain::transaction::OutPoint;
 use ln::channel::{Channel, ChannelError, ChannelKeys};
-use ln::channelmonitor::{ManyChannelMonitor, CLTV_CLAIM_BUFFER, HTLC_FAIL_TIMEOUT_BLOCKS};
+use ln::channelmonitor::{ChannelMonitorUpdateErr, ManyChannelMonitor, CLTV_CLAIM_BUFFER, HTLC_FAIL_TIMEOUT_BLOCKS};
 use ln::router::{Route,RouteHop};
 use ln::msgs;
 use ln::msgs::{ChannelMessageHandler, HandleError, RAACommitmentOrder};
@@ -586,6 +586,33 @@ impl ChannelManager {
                }
        }
 
+       fn handle_monitor_update_fail(&self, mut channel_state_lock: MutexGuard<ChannelHolder>, channel_id: &[u8; 32], err: ChannelMonitorUpdateErr, reason: RAACommitmentOrder) {
+               match err {
+                       ChannelMonitorUpdateErr::PermanentFailure => {
+                               let mut chan = {
+                                       let channel_state = channel_state_lock.borrow_parts();
+                                       let chan = channel_state.by_id.remove(channel_id).expect("monitor_update_failed must be called within the same lock as the channel get!");
+                                       if let Some(short_id) = chan.get_short_channel_id() {
+                                               channel_state.short_to_id.remove(&short_id);
+                                       }
+                                       chan
+                               };
+                               mem::drop(channel_state_lock);
+                               self.finish_force_close_channel(chan.force_shutdown());
+                               let mut events = self.pending_events.lock().unwrap();
+                               if let Ok(update) = self.get_channel_update(&chan) {
+                                       events.push(events::Event::BroadcastChannelUpdate {
+                                               msg: update
+                                       });
+                               }
+                       },
+                       ChannelMonitorUpdateErr::TemporaryFailure => {
+                               let channel = channel_state_lock.by_id.get_mut(channel_id).expect("monitor_update_failed must be called within the same lock as the channel get!");
+                               channel.monitor_update_failed(reason);
+                       },
+               }
+       }
+
        #[inline]
        fn gen_rho_mu_from_shared_secret(shared_secret: &SharedSecret) -> ([u8; 32], [u8; 32]) {
                ({
@@ -984,6 +1011,11 @@ impl ChannelManager {
                                if let Some((err, code, chan_update)) = loop {
                                        let chan = channel_state.as_mut().unwrap().by_id.get_mut(&forwarding_id).unwrap();
 
+                                       // Note that we could technically not return an error yet here and just hope
+                                       // that the connection is reestablished or monitor updated by the time we get
+                                       // around to doing the actual forward, but better to fail early if we can and
+                                       // hopefully an attacker trying to path-trace payments cannot make this occur
+                                       // on a small/per-node/per-channel scale.
                                        if !chan.is_live() { // channel_disabled
                                                break Some(("Forwarding channel is not in a ready state.", 0x1000 | 20, Some(self.get_channel_update(chan).unwrap())));
                                        }
@@ -1027,6 +1059,7 @@ impl ChannelManager {
        }
 
        /// only fails if the channel does not yet have an assigned short_id
+       /// May be called with channel_state already locked!
        fn get_channel_update(&self, chan: &Channel) -> Result<msgs::ChannelUpdate, HandleError> {
                let short_channel_id = match chan.get_short_channel_id() {
                        None => return Err(HandleError{err: "Channel not yet established", action: None}),
@@ -1096,9 +1129,8 @@ impl ChannelManager {
                let (onion_payloads, htlc_msat, htlc_cltv) = ChannelManager::build_onion_payloads(&route, cur_height)?;
                let onion_packet = ChannelManager::construct_onion_packet(onion_payloads, onion_keys, &payment_hash);
 
-               let (first_hop_node_id, (update_add, commitment_signed, chan_monitor)) = {
-                       let mut channel_state_lock = self.channel_state.lock().unwrap();
-                       let channel_state = channel_state_lock.borrow_parts();
+               let (first_hop_node_id, update_add, commitment_signed) = {
+                       let mut channel_state = self.channel_state.lock().unwrap();
 
                        let id = match channel_state.short_to_id.get(&route.hops.first().unwrap().short_channel_id) {
                                None => return Err(APIError::ChannelUnavailable{err: "No channel available with first hop!"}),
@@ -1106,32 +1138,45 @@ impl ChannelManager {
                        };
 
                        let res = {
-                               let chan = channel_state.by_id.get_mut(&id).unwrap();
-                               if chan.get_their_node_id() != route.hops.first().unwrap().pubkey {
-                                       return Err(APIError::RouteError{err: "Node ID mismatch on first hop!"});
-                               }
-                               if !chan.is_live() {
-                                       return Err(APIError::ChannelUnavailable{err: "Peer for first hop currently disconnected!"});
+                               let res = {
+                                       let chan = channel_state.by_id.get_mut(&id).unwrap();
+                                       if chan.get_their_node_id() != route.hops.first().unwrap().pubkey {
+                                               return Err(APIError::RouteError{err: "Node ID mismatch on first hop!"});
+                                       }
+                                       if chan.is_awaiting_monitor_update() {
+                                               return Err(APIError::MonitorUpdateFailed);
+                                       }
+                                       if !chan.is_live() {
+                                               return Err(APIError::ChannelUnavailable{err: "Peer for first hop currently disconnected!"});
+                                       }
+                                       chan.send_htlc_and_commit(htlc_msat, payment_hash.clone(), htlc_cltv, HTLCSource::OutboundRoute {
+                                               route: route.clone(),
+                                               session_priv: session_priv.clone(),
+                                               first_hop_htlc_msat: htlc_msat,
+                                       }, onion_packet).map_err(|he| APIError::ChannelUnavailable{err: he.err})?
+                               };
+                               match res {
+                                       Some((update_add, commitment_signed, chan_monitor)) => {
+                                               if let Err(e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
+                                                       self.handle_monitor_update_fail(channel_state, &id, e, RAACommitmentOrder::CommitmentFirst);
+                                                       return Err(APIError::MonitorUpdateFailed);
+                                               }
+                                               Some((update_add, commitment_signed))
+                                       },
+                                       None => None,
                                }
-                               chan.send_htlc_and_commit(htlc_msat, payment_hash.clone(), htlc_cltv, HTLCSource::OutboundRoute {
-                                       route: route.clone(),
-                                       session_priv: session_priv.clone(),
-                                       first_hop_htlc_msat: htlc_msat,
-                               }, onion_packet).map_err(|he| APIError::ChannelUnavailable{err: he.err})?
                        };
 
                        let first_hop_node_id = route.hops.first().unwrap().pubkey;
 
                        match res {
-                               Some(msgs) => (first_hop_node_id, msgs),
+                               Some((update_add, commitment_signed)) => {
+                                       (first_hop_node_id, update_add, commitment_signed)
+                               },
                                None => return Ok(()),
                        }
                };
 
-               if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
-                       unimplemented!();
-               }
-
                let mut events = self.pending_events.lock().unwrap();
                events.push(events::Event::UpdateHTLCs {
                        node_id: first_hop_node_id,
@@ -1184,7 +1229,9 @@ impl ChannelManager {
                                },
                                None => return
                        }
-               }; // Release channel lock for install_watch_outpoint call,
+               };
+               // Because we have exclusive ownership of the channel here we can release the channel_state
+               // lock before add_update_monitor
                if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
                        unimplemented!();
                }
@@ -1299,7 +1346,10 @@ impl ChannelManager {
                                                                continue;
                                                        },
                                                };
-                                               new_events.push((Some(monitor), events::Event::UpdateHTLCs {
+                                               if let Err(_e) = self.monitor.add_update_monitor(monitor.get_funding_txo().unwrap(), monitor) {
+                                                       unimplemented!();// but def dont push the event...
+                                               }
+                                               new_events.push(events::Event::UpdateHTLCs {
                                                        node_id: forward_chan.get_their_node_id(),
                                                        updates: msgs::CommitmentUpdate {
                                                                update_add_htlcs: add_htlc_msgs,
@@ -1309,7 +1359,7 @@ impl ChannelManager {
                                                                update_fee: None,
                                                                commitment_signed: commitment_msg,
                                                        },
-                                               }));
+                                               });
                                        }
                                } else {
                                        for HTLCForwardInfo { prev_short_channel_id, prev_htlc_id, forward_info } in pending_forwards.drain(..) {
@@ -1322,10 +1372,10 @@ impl ChannelManager {
                                                        hash_map::Entry::Occupied(mut entry) => entry.get_mut().push(prev_hop_data),
                                                        hash_map::Entry::Vacant(entry) => { entry.insert(vec![prev_hop_data]); },
                                                };
-                                               new_events.push((None, events::Event::PaymentReceived {
+                                               new_events.push(events::Event::PaymentReceived {
                                                        payment_hash: forward_info.payment_hash,
                                                        amt: forward_info.amt_to_forward,
-                                               }));
+                                               });
                                        }
                                }
                        }
@@ -1339,21 +1389,8 @@ impl ChannelManager {
                }
 
                if new_events.is_empty() { return }
-
-               new_events.retain(|event| {
-                       if let &Some(ref monitor) = &event.0 {
-                               if let Err(_e) = self.monitor.add_update_monitor(monitor.get_funding_txo().unwrap(), monitor.clone()) {
-                                       unimplemented!();// but def dont push the event...
-                               }
-                       }
-                       true
-               });
-
                let mut events = self.pending_events.lock().unwrap();
-               events.reserve(new_events.len());
-               for event in new_events.drain(..) {
-                       events.push(event.1);
-               }
+               events.append(&mut new_events);
        }
 
        /// Indicates that the preimage for payment_hash is unknown after a PaymentReceived event.
@@ -1416,7 +1453,13 @@ impl ChannelManager {
 
                                        let chan = channel_state.by_id.get_mut(&chan_id).unwrap();
                                        match chan.get_update_fail_htlc_and_commit(htlc_id, err_packet) {
-                                               Ok(msg) => (chan.get_their_node_id(), msg),
+                                               Ok(Some((msg, commitment_msg, chan_monitor))) => {
+                                                       if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
+                                                               unimplemented!();
+                                                       }
+                                                       (chan.get_their_node_id(), Some((msg, commitment_msg)))
+                                               },
+                                               Ok(None) => (chan.get_their_node_id(), None),
                                                Err(_e) => {
                                                        //TODO: Do something with e?
                                                        return;
@@ -1425,13 +1468,9 @@ impl ChannelManager {
                                };
 
                                match fail_msgs {
-                                       Some((msg, commitment_msg, chan_monitor)) => {
+                                       Some((msg, commitment_msg)) => {
                                                mem::drop(channel_state);
 
-                                               if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
-                                                       unimplemented!();// but def dont push the event...
-                                               }
-
                                                let mut pending_events = self.pending_events.lock().unwrap();
                                                pending_events.push(events::Event::UpdateHTLCs {
                                                        node_id,
@@ -1496,7 +1535,13 @@ impl ChannelManager {
 
                                        let chan = channel_state.by_id.get_mut(&chan_id).unwrap();
                                        match chan.get_update_fulfill_htlc_and_commit(htlc_id, payment_preimage) {
-                                               Ok(msg) => (chan.get_their_node_id(), msg),
+                                               Ok((msgs, Some(chan_monitor))) => {
+                                                       if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
+                                                               unimplemented!();// but def dont push the event...
+                                                       }
+                                                       (chan.get_their_node_id(), msgs)
+                                               },
+                                               Ok((msgs, None)) => (chan.get_their_node_id(), msgs),
                                                Err(_e) => {
                                                        // TODO: There is probably a channel manager somewhere that needs to
                                                        // learn the preimage as the channel may be about to hit the chain.
@@ -1507,13 +1552,7 @@ impl ChannelManager {
                                };
 
                                mem::drop(channel_state);
-                               if let Some(chan_monitor) = fulfill_msgs.1 {
-                                       if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
-                                               unimplemented!();// but def dont push the event...
-                                       }
-                               }
-
-                               if let Some((msg, commitment_msg)) = fulfill_msgs.0 {
+                               if let Some((msg, commitment_msg)) = fulfill_msgs {
                                        let mut pending_events = self.pending_events.lock().unwrap();
                                        pending_events.push(events::Event::UpdateHTLCs {
                                                node_id: node_id,
@@ -1540,7 +1579,83 @@ impl ChannelManager {
        /// ChannelMonitorUpdateErr::TemporaryFailure was returned from a channel monitor update
        /// operation.
        pub fn test_restore_channel_monitor(&self) {
-               unimplemented!();
+               let mut new_events = Vec::new();
+               let mut close_results = Vec::new();
+               let mut htlc_forwards = Vec::new();
+               let mut htlc_failures = Vec::new();
+
+               {
+                       let mut channel_lock = self.channel_state.lock().unwrap();
+                       let channel_state = channel_lock.borrow_parts();
+                       let short_to_id = channel_state.short_to_id;
+                       channel_state.by_id.retain(|_, channel| {
+                               if channel.is_awaiting_monitor_update() {
+                                       let chan_monitor = channel.channel_monitor();
+                                       if let Err(e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
+                                               match e {
+                                                       ChannelMonitorUpdateErr::PermanentFailure => {
+                                                               if let Some(short_id) = channel.get_short_channel_id() {
+                                                                       short_to_id.remove(&short_id);
+                                                               }
+                                                               close_results.push(channel.force_shutdown());
+                                                               if let Ok(update) = self.get_channel_update(&channel) {
+                                                                       new_events.push(events::Event::BroadcastChannelUpdate {
+                                                                               msg: update
+                                                                       });
+                                                               }
+                                                               false
+                                                       },
+                                                       ChannelMonitorUpdateErr::TemporaryFailure => true,
+                                               }
+                                       } else {
+                                               let (raa, commitment_update, order, pending_forwards, mut pending_failures) = channel.monitor_updating_restored();
+                                               if !pending_forwards.is_empty() {
+                                                       htlc_forwards.push((channel.get_short_channel_id().expect("We can't have pending forwards before funding confirmation"), pending_forwards));
+                                               }
+                                               htlc_failures.append(&mut pending_failures);
+
+                                               macro_rules! handle_cs { () => {
+                                                       if let Some(update) = commitment_update {
+                                                               new_events.push(events::Event::UpdateHTLCs {
+                                                                       node_id: channel.get_their_node_id(),
+                                                                       updates: update,
+                                                               });
+                                                       }
+                                               } }
+                                               macro_rules! handle_raa { () => {
+                                                       if let Some(revoke_and_ack) = raa {
+                                                               new_events.push(events::Event::SendRevokeAndACK {
+                                                                       node_id: channel.get_their_node_id(),
+                                                                       msg: revoke_and_ack,
+                                                               });
+                                                       }
+                                               } }
+                                               match order {
+                                                       RAACommitmentOrder::CommitmentFirst => {
+                                                               handle_cs!();
+                                                               handle_raa!();
+                                                       },
+                                                       RAACommitmentOrder::RevokeAndACKFirst => {
+                                                               handle_raa!();
+                                                               handle_cs!();
+                                                       },
+                                               }
+                                               true
+                                       }
+                               } else { true }
+                       });
+               }
+
+               for failure in htlc_failures.drain(..) {
+                       self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), failure.0, &failure.1, failure.2);
+               }
+               self.forward_htlcs(&mut htlc_forwards[..]);
+
+               for res in close_results.drain(..) {
+                       self.finish_force_close_channel(res);
+               }
+
+               self.pending_events.lock().unwrap().append(&mut new_events);
        }
 
        fn internal_open_channel(&self, their_node_id: &PublicKey, msg: &msgs::OpenChannel) -> Result<msgs::AcceptChannel, MsgHandleErrInternal> {
@@ -1626,10 +1741,9 @@ impl ChannelManager {
                                },
                                hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel", msg.temporary_channel_id))
                        }
-               }; // Release channel lock for install_watch_outpoint call,
-                  // note that this means if the remote end is misbehaving and sends a message for the same
-                  // channel back-to-back with funding_created, we'll end up thinking they sent a message
-                  // for a bogus channel.
+               };
+               // Because we have exclusive ownership of the channel here we can release the channel_state
+               // lock before add_update_monitor
                if let Err(_e) = self.monitor.add_update_monitor(monitor_update.get_funding_txo().unwrap(), monitor_update) {
                        unimplemented!();
                }
@@ -1646,7 +1760,7 @@ impl ChannelManager {
        }
 
        fn internal_funding_signed(&self, their_node_id: &PublicKey, msg: &msgs::FundingSigned) -> Result<(), MsgHandleErrInternal> {
-               let (funding_txo, user_id, monitor) = {
+               let (funding_txo, user_id) = {
                        let mut channel_state = self.channel_state.lock().unwrap();
                        match channel_state.by_id.get_mut(&msg.channel_id) {
                                Some(chan) => {
@@ -1655,14 +1769,14 @@ impl ChannelManager {
                                                return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!", msg.channel_id));
                                        }
                                        let chan_monitor = chan.funding_signed(&msg).map_err(|e| MsgHandleErrInternal::from_maybe_close(e))?;
-                                       (chan.get_funding_txo().unwrap(), chan.get_user_id(), chan_monitor)
+                                       if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
+                                               unimplemented!();
+                                       }
+                                       (chan.get_funding_txo().unwrap(), chan.get_user_id())
                                },
                                None => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel", msg.channel_id))
                        }
                };
-               if let Err(_e) = self.monitor.add_update_monitor(monitor.get_funding_txo().unwrap(), monitor) {
-                       unimplemented!();
-               }
                let mut pending_events = self.pending_events.lock().unwrap();
                pending_events.push(events::Event::FundingBroadcastSafe {
                        funding_txo: funding_txo,
@@ -2042,7 +2156,7 @@ impl ChannelManager {
        }
 
        fn internal_commitment_signed(&self, their_node_id: &PublicKey, msg: &msgs::CommitmentSigned) -> Result<(msgs::RevokeAndACK, Option<msgs::CommitmentSigned>), MsgHandleErrInternal> {
-               let (revoke_and_ack, commitment_signed, chan_monitor) = {
+               let (revoke_and_ack, commitment_signed) = {
                        let mut channel_state = self.channel_state.lock().unwrap();
                        match channel_state.by_id.get_mut(&msg.channel_id) {
                                Some(chan) => {
@@ -2050,20 +2164,53 @@ impl ChannelManager {
                                                //TODO: here and below MsgHandleErrInternal, #153 case
                                                return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!", msg.channel_id));
                                        }
-                                       chan.commitment_signed(&msg).map_err(|e| MsgHandleErrInternal::from_maybe_close(e))?
+                                       let (revoke_and_ack, commitment_signed, chan_monitor) = chan.commitment_signed(&msg).map_err(|e| MsgHandleErrInternal::from_maybe_close(e))?;
+                                       if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
+                                               unimplemented!();
+                                       }
+                                       (revoke_and_ack, commitment_signed)
                                },
                                None => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel", msg.channel_id))
                        }
                };
-               if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
-                       unimplemented!();
-               }
-
                Ok((revoke_and_ack, commitment_signed))
        }
 
+       #[inline]
+       fn forward_htlcs(&self, per_source_pending_forwards: &mut [(u64, Vec<(PendingForwardHTLCInfo, u64)>)]) {
+               for &mut (prev_short_channel_id, ref mut pending_forwards) in per_source_pending_forwards {
+                       let mut forward_event = None;
+                       if !pending_forwards.is_empty() {
+                               let mut channel_state = self.channel_state.lock().unwrap();
+                               if channel_state.forward_htlcs.is_empty() {
+                                       forward_event = Some(Instant::now() + Duration::from_millis(((rng::rand_f32() * 4.0 + 1.0) * MIN_HTLC_RELAY_HOLDING_CELL_MILLIS as f32) as u64));
+                                       channel_state.next_forward = forward_event.unwrap();
+                               }
+                               for (forward_info, prev_htlc_id) in pending_forwards.drain(..) {
+                                       match channel_state.forward_htlcs.entry(forward_info.short_channel_id) {
+                                               hash_map::Entry::Occupied(mut entry) => {
+                                                       entry.get_mut().push(HTLCForwardInfo { prev_short_channel_id, prev_htlc_id, forward_info });
+                                               },
+                                               hash_map::Entry::Vacant(entry) => {
+                                                       entry.insert(vec!(HTLCForwardInfo { prev_short_channel_id, prev_htlc_id, forward_info }));
+                                               }
+                                       }
+                               }
+                       }
+                       match forward_event {
+                               Some(time) => {
+                                       let mut pending_events = self.pending_events.lock().unwrap();
+                                       pending_events.push(events::Event::PendingHTLCsForwardable {
+                                               time_forwardable: time
+                                       });
+                               }
+                               None => {},
+                       }
+               }
+       }
+
        fn internal_revoke_and_ack(&self, their_node_id: &PublicKey, msg: &msgs::RevokeAndACK) -> Result<Option<msgs::CommitmentUpdate>, MsgHandleErrInternal> {
-               let ((res, mut pending_forwards, mut pending_failures, chan_monitor), short_channel_id) = {
+               let ((res, pending_forwards, mut pending_failures), short_channel_id) = {
                        let mut channel_state = self.channel_state.lock().unwrap();
                        match channel_state.by_id.get_mut(&msg.channel_id) {
                                Some(chan) => {
@@ -2071,45 +2218,19 @@ impl ChannelManager {
                                                //TODO: here and below MsgHandleErrInternal, #153 case
                                                return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!", msg.channel_id));
                                        }
-                                       (chan.revoke_and_ack(&msg).map_err(|e| MsgHandleErrInternal::from_maybe_close(e))?, chan.get_short_channel_id().expect("RAA should only work on a short-id-available channel"))
+                                       let (res, pending_forwards, pending_failures, chan_monitor) = chan.revoke_and_ack(&msg).map_err(|e| MsgHandleErrInternal::from_maybe_close(e))?;
+                                       if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
+                                               unimplemented!();
+                                       }
+                                       ((res, pending_forwards, pending_failures), chan.get_short_channel_id().expect("RAA should only work on a short-id-available channel"))
                                },
                                None => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel", msg.channel_id))
                        }
                };
-               if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
-                       unimplemented!();
-               }
                for failure in pending_failures.drain(..) {
                        self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), failure.0, &failure.1, failure.2);
                }
-
-               let mut forward_event = None;
-               if !pending_forwards.is_empty() {
-                       let mut channel_state = self.channel_state.lock().unwrap();
-                       if channel_state.forward_htlcs.is_empty() {
-                               forward_event = Some(Instant::now() + Duration::from_millis(((rng::rand_f32() * 4.0 + 1.0) * MIN_HTLC_RELAY_HOLDING_CELL_MILLIS as f32) as u64));
-                               channel_state.next_forward = forward_event.unwrap();
-                       }
-                       for (forward_info, prev_htlc_id) in pending_forwards.drain(..) {
-                               match channel_state.forward_htlcs.entry(forward_info.short_channel_id) {
-                                       hash_map::Entry::Occupied(mut entry) => {
-                                               entry.get_mut().push(HTLCForwardInfo { prev_short_channel_id: short_channel_id, prev_htlc_id, forward_info });
-                                       },
-                                       hash_map::Entry::Vacant(entry) => {
-                                               entry.insert(vec!(HTLCForwardInfo { prev_short_channel_id: short_channel_id, prev_htlc_id, forward_info }));
-                                       }
-                               }
-                       }
-               }
-               match forward_event {
-                       Some(time) => {
-                               let mut pending_events = self.pending_events.lock().unwrap();
-                               pending_events.push(events::Event::PendingHTLCsForwardable {
-                                       time_forwardable: time
-                               });
-                       }
-                       None => {},
-               }
+               self.forward_htlcs(&mut [(short_channel_id, pending_forwards)]);
 
                Ok(res)
        }
@@ -2169,7 +2290,7 @@ impl ChannelManager {
        }
 
        fn internal_channel_reestablish(&self, their_node_id: &PublicKey, msg: &msgs::ChannelReestablish) -> Result<(Option<msgs::FundingLocked>, Option<msgs::RevokeAndACK>, Option<msgs::CommitmentUpdate>, RAACommitmentOrder), MsgHandleErrInternal> {
-               let (res, chan_monitor) = {
+               let res = {
                        let mut channel_state = self.channel_state.lock().unwrap();
                        match channel_state.by_id.get_mut(&msg.channel_id) {
                                Some(chan) => {
@@ -2178,16 +2299,17 @@ impl ChannelManager {
                                        }
                                        let (funding_locked, revoke_and_ack, commitment_update, channel_monitor, order) = chan.channel_reestablish(msg)
                                                .map_err(|e| MsgHandleErrInternal::from_chan_maybe_close(e, msg.channel_id))?;
-                                       (Ok((funding_locked, revoke_and_ack, commitment_update, order)), channel_monitor)
+                                       if let Some(monitor) = channel_monitor {
+                                               if let Err(_e) = self.monitor.add_update_monitor(monitor.get_funding_txo().unwrap(), monitor) {
+                                                       unimplemented!();
+                                               }
+                                       }
+                                       Ok((funding_locked, revoke_and_ack, commitment_update, order))
                                },
                                None => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel", msg.channel_id))
                        }
                };
-               if let Some(monitor) = chan_monitor {
-                       if let Err(_e) = self.monitor.add_update_monitor(monitor.get_funding_txo().unwrap(), monitor) {
-                               unimplemented!();
-                       }
-               }
+
                res
        }
 
@@ -2204,6 +2326,9 @@ impl ChannelManager {
                                if !chan.is_outbound() {
                                        return Err(APIError::APIMisuseError{err: "update_fee cannot be sent for an inbound channel"});
                                }
+                               if chan.is_awaiting_monitor_update() {
+                                       return Err(APIError::MonitorUpdateFailed);
+                               }
                                if !chan.is_live() {
                                        return Err(APIError::ChannelUnavailable{err: "Channel is either not yet fully established or peer is currently disconnected"});
                                }
@@ -2553,7 +2678,7 @@ mod tests {
        use chain::transaction::OutPoint;
        use chain::chaininterface::ChainListener;
        use ln::channelmanager::{ChannelManager,OnionKeys};
-       use ln::channelmonitor::{CLTV_CLAIM_BUFFER, HTLC_FAIL_TIMEOUT_BLOCKS};
+       use ln::channelmonitor::{ChannelMonitorUpdateErr, CLTV_CLAIM_BUFFER, HTLC_FAIL_TIMEOUT_BLOCKS};
        use ln::router::{Route, RouteHop, Router};
        use ln::msgs;
        use ln::msgs::{ChannelMessageHandler,RoutingMessageHandler};
@@ -3018,15 +3143,17 @@ mod tests {
                commitment_msg: msgs::CommitmentSigned,
        }
        impl SendEvent {
+               fn from_commitment_update(node_id: PublicKey, updates: msgs::CommitmentUpdate) -> SendEvent {
+                       assert!(updates.update_fulfill_htlcs.is_empty());
+                       assert!(updates.update_fail_htlcs.is_empty());
+                       assert!(updates.update_fail_malformed_htlcs.is_empty());
+                       assert!(updates.update_fee.is_none());
+                       SendEvent { node_id: node_id, msgs: updates.update_add_htlcs, commitment_msg: updates.commitment_signed }
+               }
+
                fn from_event(event: Event) -> SendEvent {
                        match event {
-                               Event::UpdateHTLCs { node_id, updates: msgs::CommitmentUpdate { update_add_htlcs, update_fulfill_htlcs, update_fail_htlcs, update_fail_malformed_htlcs, update_fee, commitment_signed } } => {
-                                       assert!(update_fulfill_htlcs.is_empty());
-                                       assert!(update_fail_htlcs.is_empty());
-                                       assert!(update_fail_malformed_htlcs.is_empty());
-                                       assert!(update_fee.is_none());
-                                       SendEvent { node_id: node_id, msgs: update_add_htlcs, commitment_msg: commitment_signed }
-                               },
+                               Event::UpdateHTLCs { node_id, updates } => SendEvent::from_commitment_update(node_id, updates),
                                _ => panic!("Unexpected event type!"),
                        }
                }
@@ -4115,6 +4242,19 @@ mod tests {
                }
        }
 
+       macro_rules! expect_pending_htlcs_forwardable {
+               ($node: expr) => {{
+                       let events = $node.node.get_and_clear_pending_events();
+                       assert_eq!(events.len(), 1);
+                       match events[0] {
+                               Event::PendingHTLCsForwardable { .. } => { },
+                               _ => panic!("Unexpected event"),
+                       };
+                       $node.node.channel_state.lock().unwrap().next_forward = Instant::now();
+                       $node.node.process_pending_htlc_forwards();
+               }}
+       }
+
        #[test]
        fn channel_reserve_test() {
                use util::rng;
@@ -4147,19 +4287,6 @@ mod tests {
                        }}
                };
 
-               macro_rules! expect_pending_htlcs_forwardable {
-                       ($node: expr) => {{
-                               let events = $node.node.get_and_clear_pending_events();
-                               assert_eq!(events.len(), 1);
-                               match events[0] {
-                                       Event::PendingHTLCsForwardable { .. } => { },
-                                       _ => panic!("Unexpected event"),
-                               };
-                               $node.node.channel_state.lock().unwrap().next_forward = Instant::now();
-                               $node.node.process_pending_htlc_forwards();
-                       }}
-               };
-
                macro_rules! expect_forward {
                        ($node: expr) => {{
                                let mut events = $node.node.get_and_clear_pending_events();
@@ -5453,6 +5580,447 @@ mod tests {
                claim_payment(&nodes[0], &[&nodes[1]], payment_preimage_2);
        }
 
+       #[test]
+       fn test_simple_monitor_permanent_update_fail() {
+               // Test that we handle a simple permanent monitor update failure
+               let mut nodes = create_network(2);
+               create_announced_chan_between_nodes(&nodes, 0, 1);
+
+               let route = nodes[0].router.get_route(&nodes[1].node.get_our_node_id(), None, &Vec::new(), 1000000, TEST_FINAL_CLTV).unwrap();
+               let (_, payment_hash_1) = get_payment_preimage_hash!(nodes[0]);
+
+               *nodes[0].chan_monitor.update_ret.lock().unwrap() = Err(ChannelMonitorUpdateErr::PermanentFailure);
+               if let Err(APIError::MonitorUpdateFailed) = nodes[0].node.send_payment(route, payment_hash_1) {} else { panic!(); }
+               check_added_monitors!(nodes[0], 1);
+
+               let events_1 = nodes[0].node.get_and_clear_pending_events();
+               assert_eq!(events_1.len(), 1);
+               match events_1[0] {
+                       Event::BroadcastChannelUpdate { .. } => {},
+                       _ => panic!("Unexpected event"),
+               };
+
+               // TODO: Once we hit the chain with the failure transaction we should check that we get a
+               // PaymentFailed event
+
+               assert_eq!(nodes[0].node.list_channels().len(), 0);
+       }
+
+       fn do_test_simple_monitor_temporary_update_fail(disconnect: bool) {
+               // Test that we can recover from a simple temporary monitor update failure optionally with
+               // a disconnect in between
+               let mut nodes = create_network(2);
+               create_announced_chan_between_nodes(&nodes, 0, 1);
+
+               let route = nodes[0].router.get_route(&nodes[1].node.get_our_node_id(), None, &Vec::new(), 1000000, TEST_FINAL_CLTV).unwrap();
+               let (payment_preimage_1, payment_hash_1) = get_payment_preimage_hash!(nodes[0]);
+
+               *nodes[0].chan_monitor.update_ret.lock().unwrap() = Err(ChannelMonitorUpdateErr::TemporaryFailure);
+               if let Err(APIError::MonitorUpdateFailed) = nodes[0].node.send_payment(route.clone(), payment_hash_1) {} else { panic!(); }
+               check_added_monitors!(nodes[0], 1);
+
+               let events_1 = nodes[0].node.get_and_clear_pending_events();
+               assert!(events_1.is_empty());
+               assert_eq!(nodes[0].node.list_channels().len(), 1);
+
+               if disconnect {
+                       nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id(), false);
+                       nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false);
+                       reconnect_nodes(&nodes[0], &nodes[1], true, (0, 0), (0, 0), (0, 0), (0, 0), (false, false));
+               }
+
+               *nodes[0].chan_monitor.update_ret.lock().unwrap() = Ok(());
+               nodes[0].node.test_restore_channel_monitor();
+               check_added_monitors!(nodes[0], 1);
+
+               let mut events_2 = nodes[0].node.get_and_clear_pending_events();
+               assert_eq!(events_2.len(), 1);
+               let payment_event = SendEvent::from_event(events_2.pop().unwrap());
+               assert_eq!(payment_event.node_id, nodes[1].node.get_our_node_id());
+               nodes[1].node.handle_update_add_htlc(&nodes[0].node.get_our_node_id(), &payment_event.msgs[0]).unwrap();
+               commitment_signed_dance!(nodes[1], nodes[0], payment_event.commitment_msg, false);
+
+               expect_pending_htlcs_forwardable!(nodes[1]);
+
+               let events_3 = nodes[1].node.get_and_clear_pending_events();
+               assert_eq!(events_3.len(), 1);
+               match events_3[0] {
+                       Event::PaymentReceived { ref payment_hash, amt } => {
+                               assert_eq!(payment_hash_1, *payment_hash);
+                               assert_eq!(amt, 1000000);
+                       },
+                       _ => panic!("Unexpected event"),
+               }
+
+               claim_payment(&nodes[0], &[&nodes[1]], payment_preimage_1);
+
+               // Now set it to failed again...
+               let (_, payment_hash_2) = get_payment_preimage_hash!(nodes[0]);
+               *nodes[0].chan_monitor.update_ret.lock().unwrap() = Err(ChannelMonitorUpdateErr::TemporaryFailure);
+               if let Err(APIError::MonitorUpdateFailed) = nodes[0].node.send_payment(route, payment_hash_2) {} else { panic!(); }
+               check_added_monitors!(nodes[0], 1);
+
+               let events_4 = nodes[0].node.get_and_clear_pending_events();
+               assert!(events_4.is_empty());
+               assert_eq!(nodes[0].node.list_channels().len(), 1);
+
+               if disconnect {
+                       nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id(), false);
+                       nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false);
+                       reconnect_nodes(&nodes[0], &nodes[1], false, (0, 0), (0, 0), (0, 0), (0, 0), (false, false));
+               }
+
+               // ...and make sure we can force-close a TemporaryFailure channel with a PermanentFailure
+               *nodes[0].chan_monitor.update_ret.lock().unwrap() = Err(ChannelMonitorUpdateErr::PermanentFailure);
+               nodes[0].node.test_restore_channel_monitor();
+               check_added_monitors!(nodes[0], 1);
+
+               let events_5 = nodes[0].node.get_and_clear_pending_events();
+               assert_eq!(events_5.len(), 1);
+               match events_5[0] {
+                       Event::BroadcastChannelUpdate { .. } => {},
+                       _ => panic!("Unexpected event"),
+               }
+
+               // TODO: Once we hit the chain with the failure transaction we should check that we get a
+               // PaymentFailed event
+
+               assert_eq!(nodes[0].node.list_channels().len(), 0);
+       }
+
+       #[test]
+       fn test_simple_monitor_temporary_update_fail() {
+               do_test_simple_monitor_temporary_update_fail(false);
+               do_test_simple_monitor_temporary_update_fail(true);
+       }
+
+       fn do_test_monitor_temporary_update_fail(disconnect_count: usize) {
+               let disconnect_flags = 8 | 16;
+
+               // Test that we can recover from a temporary monitor update failure with some in-flight
+               // HTLCs going on at the same time potentially with some disconnection thrown in.
+               // * First we route a payment, then get a temporary monitor update failure when trying to
+               //   route a second payment. We then claim the first payment.
+               // * If disconnect_count is set, we will disconnect at this point (which is likely as
+               //   TemporaryFailure likely indicates net disconnect which resulted in failing to update
+               //   the ChannelMonitor on a watchtower).
+               // * If !(disconnect_count & 16) we deliver a update_fulfill_htlc/CS for the first payment
+               //   immediately, otherwise we wait sconnect and deliver them via the reconnect
+               //   channel_reestablish processing (ie disconnect_count & 16 makes no sense if
+               //   disconnect_count & !disconnect_flags is 0).
+               // * We then update the channel monitor, reconnecting if disconnect_count is set and walk
+               //   through message sending, potentially disconnect/reconnecting multiple times based on
+               //   disconnect_count, to get the update_fulfill_htlc through.
+               // * We then walk through more message exchanges to get the original update_add_htlc
+               //   through, swapping message ordering based on disconnect_count & 8 and optionally
+               //   disconnect/reconnecting based on disconnect_count.
+               let mut nodes = create_network(2);
+               create_announced_chan_between_nodes(&nodes, 0, 1);
+
+               let (payment_preimage_1, _) = route_payment(&nodes[0], &[&nodes[1]], 1000000);
+
+               // Now try to send a second payment which will fail to send
+               let route = nodes[0].router.get_route(&nodes[1].node.get_our_node_id(), None, &Vec::new(), 1000000, TEST_FINAL_CLTV).unwrap();
+               let (payment_preimage_2, payment_hash_2) = get_payment_preimage_hash!(nodes[0]);
+
+               *nodes[0].chan_monitor.update_ret.lock().unwrap() = Err(ChannelMonitorUpdateErr::TemporaryFailure);
+               if let Err(APIError::MonitorUpdateFailed) = nodes[0].node.send_payment(route.clone(), payment_hash_2) {} else { panic!(); }
+               check_added_monitors!(nodes[0], 1);
+
+               let events_1 = nodes[0].node.get_and_clear_pending_events();
+               assert!(events_1.is_empty());
+               assert_eq!(nodes[0].node.list_channels().len(), 1);
+
+               // Claim the previous payment, which will result in a update_fulfill_htlc/CS from nodes[1]
+               // but nodes[0] won't respond since it is frozen.
+               assert!(nodes[1].node.claim_funds(payment_preimage_1));
+               check_added_monitors!(nodes[1], 1);
+               let events_2 = nodes[1].node.get_and_clear_pending_events();
+               assert_eq!(events_2.len(), 1);
+               let (bs_initial_fulfill, bs_initial_commitment_signed) = match events_2[0] {
+                       Event::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
+                               assert_eq!(*node_id, nodes[0].node.get_our_node_id());
+                               assert!(update_add_htlcs.is_empty());
+                               assert_eq!(update_fulfill_htlcs.len(), 1);
+                               assert!(update_fail_htlcs.is_empty());
+                               assert!(update_fail_malformed_htlcs.is_empty());
+                               assert!(update_fee.is_none());
+
+                               if (disconnect_count & 16) == 0 {
+                                       nodes[0].node.handle_update_fulfill_htlc(&nodes[1].node.get_our_node_id(), &update_fulfill_htlcs[0]).unwrap();
+                                       let events_3 = nodes[0].node.get_and_clear_pending_events();
+                                       assert_eq!(events_3.len(), 1);
+                                       match events_3[0] {
+                                               Event::PaymentSent { ref payment_preimage } => {
+                                                       assert_eq!(*payment_preimage, payment_preimage_1);
+                                               },
+                                               _ => panic!("Unexpected event"),
+                                       }
+
+                                       if let Err(msgs::HandleError{err, action: Some(msgs::ErrorAction::IgnoreError) }) = nodes[0].node.handle_commitment_signed(&nodes[1].node.get_our_node_id(), commitment_signed) {
+                                               assert_eq!(err, "Previous monitor update failure prevented generation of RAA");
+                                       } else { panic!(); }
+                               }
+
+                               (update_fulfill_htlcs[0].clone(), commitment_signed.clone())
+                       },
+                       _ => panic!("Unexpected event"),
+               };
+
+               if disconnect_count & !disconnect_flags > 0 {
+                       nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id(), false);
+                       nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false);
+               }
+
+               // Now fix monitor updating...
+               *nodes[0].chan_monitor.update_ret.lock().unwrap() = Ok(());
+               nodes[0].node.test_restore_channel_monitor();
+               check_added_monitors!(nodes[0], 1);
+
+               macro_rules! disconnect_reconnect_peers { () => { {
+                       nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id(), false);
+                       nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false);
+
+                       let reestablish_1 = nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id());
+                       assert_eq!(reestablish_1.len(), 1);
+                       let reestablish_2 = nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id());
+                       assert_eq!(reestablish_2.len(), 1);
+
+                       let as_resp = nodes[0].node.handle_channel_reestablish(&nodes[1].node.get_our_node_id(), &reestablish_2[0]).unwrap();
+                       let bs_resp = nodes[1].node.handle_channel_reestablish(&nodes[0].node.get_our_node_id(), &reestablish_1[0]).unwrap();
+
+                       assert!(as_resp.0.is_none());
+                       assert!(bs_resp.0.is_none());
+
+                       (reestablish_1, reestablish_2, as_resp, bs_resp)
+               } } }
+
+               let (payment_event, initial_revoke_and_ack) = if disconnect_count & !disconnect_flags > 0 {
+                       let events_4 = nodes[0].node.get_and_clear_pending_events();
+                       assert!(events_4.is_empty());
+
+                       let reestablish_1 = nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id());
+                       assert_eq!(reestablish_1.len(), 1);
+                       let reestablish_2 = nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id());
+                       assert_eq!(reestablish_2.len(), 1);
+
+                       let mut as_resp = nodes[0].node.handle_channel_reestablish(&nodes[1].node.get_our_node_id(), &reestablish_2[0]).unwrap();
+                       check_added_monitors!(nodes[0], 0);
+                       let mut bs_resp = nodes[1].node.handle_channel_reestablish(&nodes[0].node.get_our_node_id(), &reestablish_1[0]).unwrap();
+                       check_added_monitors!(nodes[1], 0);
+
+                       assert!(as_resp.0.is_none());
+                       assert!(bs_resp.0.is_none());
+
+                       assert!(bs_resp.1.is_none());
+                       if (disconnect_count & 16) == 0 {
+                               assert!(bs_resp.2.is_none());
+
+                               assert!(as_resp.1.is_some());
+                               assert!(as_resp.2.is_some());
+                               assert!(as_resp.3 == msgs::RAACommitmentOrder::CommitmentFirst);
+                       } else {
+                               assert!(bs_resp.2.as_ref().unwrap().update_add_htlcs.is_empty());
+                               assert!(bs_resp.2.as_ref().unwrap().update_fail_htlcs.is_empty());
+                               assert!(bs_resp.2.as_ref().unwrap().update_fail_malformed_htlcs.is_empty());
+                               assert!(bs_resp.2.as_ref().unwrap().update_fee.is_none());
+                               assert!(bs_resp.2.as_ref().unwrap().update_fulfill_htlcs == vec![bs_initial_fulfill]);
+                               assert!(bs_resp.2.as_ref().unwrap().commitment_signed == bs_initial_commitment_signed);
+
+                               assert!(as_resp.1.is_none());
+
+                               nodes[0].node.handle_update_fulfill_htlc(&nodes[1].node.get_our_node_id(), &bs_resp.2.as_ref().unwrap().update_fulfill_htlcs[0]).unwrap();
+                               let events_3 = nodes[0].node.get_and_clear_pending_events();
+                               assert_eq!(events_3.len(), 1);
+                               match events_3[0] {
+                                       Event::PaymentSent { ref payment_preimage } => {
+                                               assert_eq!(*payment_preimage, payment_preimage_1);
+                                       },
+                                       _ => panic!("Unexpected event"),
+                               }
+
+                               let (as_resp_raa, as_resp_cu) = nodes[0].node.handle_commitment_signed(&nodes[1].node.get_our_node_id(), &bs_resp.2.as_ref().unwrap().commitment_signed).unwrap();
+                               assert!(as_resp_cu.is_none());
+                               check_added_monitors!(nodes[0], 1);
+
+                               as_resp.1 = Some(as_resp_raa);
+                               bs_resp.2 = None;
+                       }
+
+                       if disconnect_count & !disconnect_flags > 1 {
+                               let (second_reestablish_1, second_reestablish_2, second_as_resp, second_bs_resp) = disconnect_reconnect_peers!();
+
+                               if (disconnect_count & 16) == 0 {
+                                       assert!(reestablish_1 == second_reestablish_1);
+                                       assert!(reestablish_2 == second_reestablish_2);
+                               }
+                               assert!(as_resp == second_as_resp);
+                               assert!(bs_resp == second_bs_resp);
+                       }
+
+                       (SendEvent::from_commitment_update(nodes[1].node.get_our_node_id(), as_resp.2.unwrap()), as_resp.1.unwrap())
+               } else {
+                       let mut events_4 = nodes[0].node.get_and_clear_pending_events();
+                       assert_eq!(events_4.len(), 2);
+                       (SendEvent::from_event(events_4.remove(0)), match events_4[0] {
+                               Event::SendRevokeAndACK { ref node_id, ref msg } => {
+                                       assert_eq!(*node_id, nodes[1].node.get_our_node_id());
+                                       msg.clone()
+                               },
+                               _ => panic!("Unexpected event"),
+                       })
+               };
+
+               assert_eq!(payment_event.node_id, nodes[1].node.get_our_node_id());
+
+               nodes[1].node.handle_update_add_htlc(&nodes[0].node.get_our_node_id(), &payment_event.msgs[0]).unwrap();
+               let (bs_revoke_and_ack, bs_commitment_signed) = nodes[1].node.handle_commitment_signed(&nodes[0].node.get_our_node_id(), &payment_event.commitment_msg).unwrap();
+               assert!(bs_commitment_signed.is_none()); // nodes[1] is awaiting an RAA from nodes[0] still
+               check_added_monitors!(nodes[1], 1);
+
+               if disconnect_count & !disconnect_flags > 2 {
+                       let (_, _, as_resp, bs_resp) = disconnect_reconnect_peers!();
+
+                       assert!(as_resp.1.unwrap() == initial_revoke_and_ack);
+                       assert!(bs_resp.1.unwrap() == bs_revoke_and_ack);
+
+                       assert!(as_resp.2.is_none());
+                       assert!(bs_resp.2.is_none());
+               }
+
+               let as_commitment_update;
+               let bs_second_commitment_update;
+
+               macro_rules! handle_bs_raa { () => {
+                       as_commitment_update = nodes[0].node.handle_revoke_and_ack(&nodes[1].node.get_our_node_id(), &bs_revoke_and_ack).unwrap().unwrap();
+                       assert!(as_commitment_update.update_add_htlcs.is_empty());
+                       assert!(as_commitment_update.update_fulfill_htlcs.is_empty());
+                       assert!(as_commitment_update.update_fail_htlcs.is_empty());
+                       assert!(as_commitment_update.update_fail_malformed_htlcs.is_empty());
+                       assert!(as_commitment_update.update_fee.is_none());
+                       check_added_monitors!(nodes[0], 1);
+               } }
+
+               macro_rules! handle_initial_raa { () => {
+                       bs_second_commitment_update = nodes[1].node.handle_revoke_and_ack(&nodes[0].node.get_our_node_id(), &initial_revoke_and_ack).unwrap().unwrap();
+                       assert!(bs_second_commitment_update.update_add_htlcs.is_empty());
+                       assert!(bs_second_commitment_update.update_fulfill_htlcs.is_empty());
+                       assert!(bs_second_commitment_update.update_fail_htlcs.is_empty());
+                       assert!(bs_second_commitment_update.update_fail_malformed_htlcs.is_empty());
+                       assert!(bs_second_commitment_update.update_fee.is_none());
+                       check_added_monitors!(nodes[1], 1);
+               } }
+
+               if (disconnect_count & 8) == 0 {
+                       handle_bs_raa!();
+
+                       if disconnect_count & !disconnect_flags > 3 {
+                               let (_, _, as_resp, bs_resp) = disconnect_reconnect_peers!();
+
+                               assert!(as_resp.1.unwrap() == initial_revoke_and_ack);
+                               assert!(bs_resp.1.is_none());
+
+                               assert!(as_resp.2.unwrap() == as_commitment_update);
+                               assert!(bs_resp.2.is_none());
+
+                               assert!(as_resp.3 == msgs::RAACommitmentOrder::RevokeAndACKFirst);
+                       }
+
+                       handle_initial_raa!();
+
+                       if disconnect_count & !disconnect_flags > 4 {
+                               let (_, _, as_resp, bs_resp) = disconnect_reconnect_peers!();
+
+                               assert!(as_resp.1.is_none());
+                               assert!(bs_resp.1.is_none());
+
+                               assert!(as_resp.2.unwrap() == as_commitment_update);
+                               assert!(bs_resp.2.unwrap() == bs_second_commitment_update);
+                       }
+               } else {
+                       handle_initial_raa!();
+
+                       if disconnect_count & !disconnect_flags > 3 {
+                               let (_, _, as_resp, bs_resp) = disconnect_reconnect_peers!();
+
+                               assert!(as_resp.1.is_none());
+                               assert!(bs_resp.1.unwrap() == bs_revoke_and_ack);
+
+                               assert!(as_resp.2.is_none());
+                               assert!(bs_resp.2.unwrap() == bs_second_commitment_update);
+
+                               assert!(bs_resp.3 == msgs::RAACommitmentOrder::RevokeAndACKFirst);
+                       }
+
+                       handle_bs_raa!();
+
+                       if disconnect_count & !disconnect_flags > 4 {
+                               let (_, _, as_resp, bs_resp) = disconnect_reconnect_peers!();
+
+                               assert!(as_resp.1.is_none());
+                               assert!(bs_resp.1.is_none());
+
+                               assert!(as_resp.2.unwrap() == as_commitment_update);
+                               assert!(bs_resp.2.unwrap() == bs_second_commitment_update);
+                       }
+               }
+
+               let (as_revoke_and_ack, as_commitment_signed) = nodes[0].node.handle_commitment_signed(&nodes[1].node.get_our_node_id(), &bs_second_commitment_update.commitment_signed).unwrap();
+               assert!(as_commitment_signed.is_none());
+               check_added_monitors!(nodes[0], 1);
+
+               let (bs_second_revoke_and_ack, bs_third_commitment_signed) = nodes[1].node.handle_commitment_signed(&nodes[0].node.get_our_node_id(), &as_commitment_update.commitment_signed).unwrap();
+               assert!(bs_third_commitment_signed.is_none());
+               check_added_monitors!(nodes[1], 1);
+
+               assert!(nodes[1].node.handle_revoke_and_ack(&nodes[0].node.get_our_node_id(), &as_revoke_and_ack).unwrap().is_none());
+               check_added_monitors!(nodes[1], 1);
+
+               assert!(nodes[0].node.handle_revoke_and_ack(&nodes[1].node.get_our_node_id(), &bs_second_revoke_and_ack).unwrap().is_none());
+               check_added_monitors!(nodes[0], 1);
+
+               expect_pending_htlcs_forwardable!(nodes[1]);
+
+               let events_5 = nodes[1].node.get_and_clear_pending_events();
+               assert_eq!(events_5.len(), 1);
+               match events_5[0] {
+                       Event::PaymentReceived { ref payment_hash, amt } => {
+                               assert_eq!(payment_hash_2, *payment_hash);
+                               assert_eq!(amt, 1000000);
+                       },
+                       _ => panic!("Unexpected event"),
+               }
+
+               claim_payment(&nodes[0], &[&nodes[1]], payment_preimage_2);
+       }
+
+       #[test]
+       fn test_monitor_temporary_update_fail_a() {
+               do_test_monitor_temporary_update_fail(0);
+               do_test_monitor_temporary_update_fail(1);
+               do_test_monitor_temporary_update_fail(2);
+               do_test_monitor_temporary_update_fail(3);
+               do_test_monitor_temporary_update_fail(4);
+               do_test_monitor_temporary_update_fail(5);
+       }
+
+       #[test]
+       fn test_monitor_temporary_update_fail_b() {
+               do_test_monitor_temporary_update_fail(2 | 8);
+               do_test_monitor_temporary_update_fail(3 | 8);
+               do_test_monitor_temporary_update_fail(4 | 8);
+               do_test_monitor_temporary_update_fail(5 | 8);
+       }
+
+       #[test]
+       fn test_monitor_temporary_update_fail_c() {
+               do_test_monitor_temporary_update_fail(1 | 16);
+               do_test_monitor_temporary_update_fail(2 | 16);
+               do_test_monitor_temporary_update_fail(3 | 16);
+               do_test_monitor_temporary_update_fail(2 | 8 | 16);
+               do_test_monitor_temporary_update_fail(3 | 8 | 16);
+       }
+
        #[test]
        fn test_invalid_channel_announcement() {
                //Test BOLT 7 channel_announcement msg requirement for final node, gather data to build customed channel_announcement msgs
index 59ed177b38fb3da0743e4a2ee71332f0e9abc1b0..60cb9c91583d604fa8fa464bd0b0f60790c48005 100644 (file)
@@ -39,6 +39,7 @@ use std::sync::{Arc,Mutex};
 use std::{hash,cmp};
 
 /// An error enum representing a failure to persist a channel monitor update.
+#[derive(Clone)]
 pub enum ChannelMonitorUpdateErr {
        /// Used to indicate a temporary failure (eg connection to a watchtower failed, but is expected
        /// to succeed at some point in the future).
@@ -47,6 +48,22 @@ pub enum ChannelMonitorUpdateErr {
        /// submitting new commitment transactions to the remote party.
        /// ChannelManager::test_restore_channel_monitor can be used to retry the update(s) and restore
        /// the channel to an operational state.
+       ///
+       /// Note that continuing to operate when no copy of the updated ChannelMonitor could be
+       /// persisted is unsafe - if you failed to store the update on your own local disk you should
+       /// instead return PermanentFailure to force closure of the channel ASAP.
+       ///
+       /// Even when a channel has been "frozen" updates to the ChannelMonitor can continue to occur
+       /// (eg if an inbound HTLC which we forwarded was claimed upstream resulting in us attempting
+       /// to claim it on this channel) and those updates must be applied wherever they can be. At
+       /// least one such updated ChannelMonitor must be persisted otherwise PermanentFailure should
+       /// be returned to get things on-chain ASAP using only the in-memory copy. Obviously updates to
+       /// the channel which would invalidate previous ChannelMonitors are not made when a channel has
+       /// been "frozen".
+       ///
+       /// Note that even if updates made after TemporaryFailure succeed you must still call
+       /// test_restore_channel_monitor to ensure you have the latest monitor and re-enable normal
+       /// channel operation.
        TemporaryFailure,
        /// Used to indicate no further channel monitor updates will be allowed (eg we've moved on to a
        /// different watchtower and cannot update with all watchtowers that were previously informed
index bab2674e6fac0e96175239cbca78c2b8236ae624..b5db51cf2bb4a200c58e6e03c0bf52a65626b54c 100644 (file)
@@ -224,7 +224,7 @@ pub struct FundingSigned {
 }
 
 /// A funding_locked message to be sent or received from a peer
-#[derive(Clone)]
+#[derive(Clone, PartialEq)]
 pub struct FundingLocked {
        pub(crate) channel_id: [u8; 32],
        pub(crate) next_per_commitment_point: PublicKey,
@@ -244,7 +244,7 @@ pub struct ClosingSigned {
 }
 
 /// An update_add_htlc message to be sent or received from a peer
-#[derive(Clone)]
+#[derive(Clone, PartialEq)]
 pub struct UpdateAddHTLC {
        pub(crate) channel_id: [u8; 32],
        pub(crate) htlc_id: u64,
@@ -255,7 +255,7 @@ pub struct UpdateAddHTLC {
 }
 
 /// An update_fulfill_htlc message to be sent or received from a peer
-#[derive(Clone)]
+#[derive(Clone, PartialEq)]
 pub struct UpdateFulfillHTLC {
        pub(crate) channel_id: [u8; 32],
        pub(crate) htlc_id: u64,
@@ -263,7 +263,7 @@ pub struct UpdateFulfillHTLC {
 }
 
 /// An update_fail_htlc message to be sent or received from a peer
-#[derive(Clone)]
+#[derive(Clone, PartialEq)]
 pub struct UpdateFailHTLC {
        pub(crate) channel_id: [u8; 32],
        pub(crate) htlc_id: u64,
@@ -271,7 +271,7 @@ pub struct UpdateFailHTLC {
 }
 
 /// An update_fail_malformed_htlc message to be sent or received from a peer
-#[derive(Clone)]
+#[derive(Clone, PartialEq)]
 pub struct UpdateFailMalformedHTLC {
        pub(crate) channel_id: [u8; 32],
        pub(crate) htlc_id: u64,
@@ -280,7 +280,7 @@ pub struct UpdateFailMalformedHTLC {
 }
 
 /// A commitment_signed message to be sent or received from a peer
-#[derive(Clone)]
+#[derive(Clone, PartialEq)]
 pub struct CommitmentSigned {
        pub(crate) channel_id: [u8; 32],
        pub(crate) signature: Signature,
@@ -288,6 +288,7 @@ pub struct CommitmentSigned {
 }
 
 /// A revoke_and_ack message to be sent or received from a peer
+#[derive(Clone, PartialEq)]
 pub struct RevokeAndACK {
        pub(crate) channel_id: [u8; 32],
        pub(crate) per_commitment_secret: [u8; 32],
@@ -295,17 +296,20 @@ pub struct RevokeAndACK {
 }
 
 /// An update_fee message to be sent or received from a peer
+#[derive(PartialEq)]
 pub struct UpdateFee {
        pub(crate) channel_id: [u8; 32],
        pub(crate) feerate_per_kw: u32,
 }
 
+#[derive(PartialEq)]
 pub(crate) struct DataLossProtect {
        pub(crate) your_last_per_commitment_secret: [u8; 32],
        pub(crate) my_current_per_commitment_point: PublicKey,
 }
 
 /// A channel_reestablish message to be sent or received from a peer
+#[derive(PartialEq)]
 pub struct ChannelReestablish {
        pub(crate) channel_id: [u8; 32],
        pub(crate) next_local_commitment_number: u64,
@@ -463,6 +467,7 @@ pub struct HandleError { //TODO: rename me
 
 /// Struct used to return values from revoke_and_ack messages, containing a bunch of commitment
 /// transaction updates if they were pending.
+#[derive(PartialEq)]
 pub struct CommitmentUpdate {
        pub(crate) update_add_htlcs: Vec<UpdateAddHTLC>,
        pub(crate) update_fulfill_htlcs: Vec<UpdateFulfillHTLC>,
@@ -629,7 +634,18 @@ pub(crate) struct OnionPacket {
        pub(crate) hmac: [u8; 32],
 }
 
-#[derive(Clone)]
+impl PartialEq for OnionPacket {
+       fn eq(&self, other: &OnionPacket) -> bool {
+               for (i, j) in self.hop_data.iter().zip(other.hop_data.iter()) {
+                       if i != j { return false; }
+               }
+               self.version == other.version &&
+                       self.public_key == other.public_key &&
+                       self.hmac == other.hmac
+       }
+}
+
+#[derive(Clone, PartialEq)]
 pub(crate) struct OnionErrorPacket {
        // This really should be a constant size slice, but the spec lets these things be up to 128KB?
        // (TODO) We limit it in decode to much lower...
index b629e7fa6b01c011350a036f96a10f3227cacf7f..94cdef9e41fe76f7fa568edba784961edaeb8825 100644 (file)
@@ -866,6 +866,17 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                Self::do_attempt_write_data(&mut descriptor, peer);
                                                continue;
                                        },
+                                       Event::SendRevokeAndACK { ref node_id, ref msg } => {
+                                               log_trace!(self, "Handling SendRevokeAndACK event in peer_handler for node {} for channel {}",
+                                                               log_pubkey!(node_id),
+                                                               log_bytes!(msg.channel_id));
+                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
+                                                               //TODO: Do whatever we're gonna do for handling dropped messages
+                                                       });
+                                               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 133)));
+                                               Self::do_attempt_write_data(&mut descriptor, peer);
+                                               continue;
+                                       },
                                        Event::SendShutdown { ref node_id, ref msg } => {
                                                log_trace!(self, "Handling Shutdown event in peer_handler for node {} for channel {}",
                                                                log_pubkey!(node_id),
index 9446b35854baf9e34f665d3e1755ab7f599c4992..66ddf84d96871857ed027cc5c3c74f3bfb749dfb 100644 (file)
@@ -32,7 +32,10 @@ pub enum APIError {
        ChannelUnavailable {
                /// A human-readable error message
                err: &'static str
-       }
+       },
+       /// An attempt to call add_update_monitor returned an Err (ie you did this!), causing the
+       /// attempted action to fail.
+       MonitorUpdateFailed,
 }
 
 impl fmt::Debug for APIError {
@@ -42,6 +45,7 @@ impl fmt::Debug for APIError {
                        APIError::FeeRateTooHigh {ref err, ref feerate} => write!(f, "{} feerate: {}", err, feerate),
                        APIError::RouteError {ref err} => f.write_str(err),
                        APIError::ChannelUnavailable {ref err} => f.write_str(err),
+                       APIError::MonitorUpdateFailed => f.write_str("Client indicated a channel monitor update failed"),
                }
        }
 }
index 51417c63c75ef0fd7f5e4694d9d35fca7a9d3cfe..e11e4e82c1ccbf2b735c804ab288c4ea215bc13e 100644 (file)
@@ -129,6 +129,15 @@ pub enum Event {
                /// The update messages which should be sent. ALL messages in the struct should be sent!
                updates: msgs::CommitmentUpdate,
        },
+       /// Used to indicate that a revoke_and_ack message should be sent to the peer with the given node_id.
+       ///
+       /// This event is handled by PeerManager::process_events if you are using a PeerManager.
+       SendRevokeAndACK {
+               /// The node_id of the node which should receive this message
+               node_id: PublicKey,
+               /// The message which should be sent.
+               msg: msgs::RevokeAndACK,
+       },
        /// Used to indicate that a shutdown message should be sent to the peer with the given node_id.
        ///
        /// This event is handled by PeerManager::process_events if you are using a PeerManager.
index 8ab02b2674a60ed68c06ebc67ee3561ad19077be..2577bc9f9badae0f6bd2acbb89f2011424cf1e4e 100644 (file)
@@ -38,12 +38,14 @@ impl chaininterface::FeeEstimator for TestFeeEstimator {
 pub struct TestChannelMonitor {
        pub added_monitors: Mutex<Vec<(OutPoint, channelmonitor::ChannelMonitor)>>,
        pub simple_monitor: Arc<channelmonitor::SimpleManyChannelMonitor<OutPoint>>,
+       pub update_ret: Mutex<Result<(), channelmonitor::ChannelMonitorUpdateErr>>,
 }
 impl TestChannelMonitor {
        pub fn new(chain_monitor: Arc<chaininterface::ChainWatchInterface>, broadcaster: Arc<chaininterface::BroadcasterInterface>) -> Self {
                Self {
                        added_monitors: Mutex::new(Vec::new()),
                        simple_monitor: channelmonitor::SimpleManyChannelMonitor::new(chain_monitor, broadcaster),
+                       update_ret: Mutex::new(Ok(())),
                }
        }
 }
@@ -57,7 +59,8 @@ impl channelmonitor::ManyChannelMonitor for TestChannelMonitor {
                w.0.clear();
                monitor.write_for_watchtower(&mut w).unwrap(); // This at least shouldn't crash...
                self.added_monitors.lock().unwrap().push((funding_txo, monitor.clone()));
-               self.simple_monitor.add_update_monitor(funding_txo, monitor)
+               assert!(self.simple_monitor.add_update_monitor(funding_txo, monitor).is_ok());
+               self.update_ret.lock().unwrap().clone()
        }
 }