Add pending-HTLC-failure to and unify force_shutdown() handling 2018-07-force-close-handling
authorMatt Corallo <git@bluematt.me>
Sat, 28 Jul 2018 23:15:45 +0000 (19:15 -0400)
committerMatt Corallo <git@bluematt.me>
Sun, 29 Jul 2018 06:23:02 +0000 (02:23 -0400)
This patch got a bit bigger than I'd intended, but primarily this
unifies force_shutdown() handling so all the callsites at least
look similar. It also fails backwards any HTLCs which were
completely pending (ie hadn't been committed to) and ensures we
broadcast our local commitment transaction. It also adds a
force_close_channel method to ChannelManager to expose
force-closure.

src/ln/channel.rs
src/ln/channelmanager.rs

index 0869796232501d69748ad14c549c3a067dea1964..fc67085250a3e1f429d53dcffa06f7ff3455a0d9 100644 (file)
@@ -1836,6 +1836,7 @@ impl Channel {
 
        /// Guaranteed to be Some after both FundingLocked messages have been exchanged (and, thus,
        /// is_usable() returns true).
+       /// Allowed in any state (including after shutdown)
        pub fn get_short_channel_id(&self) -> Option<u64> {
                self.short_channel_id
        }
@@ -1846,10 +1847,12 @@ impl Channel {
                self.channel_monitor.get_funding_txo()
        }
 
+       /// Allowed in any state (including after shutdown)
        pub fn get_their_node_id(&self) -> PublicKey {
                self.their_node_id
        }
 
+       /// Allowed in any state (including after shutdown)
        pub fn get_our_htlc_minimum_msat(&self) -> u64 {
                self.our_htlc_minimum_msat
        }
@@ -1858,6 +1861,7 @@ impl Channel {
                self.channel_value_satoshis
        }
 
+       /// Allowed in any state (including after shutdown)
        pub fn get_channel_update_count(&self) -> u32 {
                self.channel_update_count
        }
@@ -1867,6 +1871,7 @@ impl Channel {
        }
 
        /// Gets the fee we'd want to charge for adding an HTLC output to this Channel
+       /// Allowed in any state (including after shutdown)
        pub fn get_our_fee_base_msat(&self, fee_estimator: &FeeEstimator) -> u32 {
                // For lack of a better metric, we calculate what it would cost to consolidate the new HTLC
                // output value back into a transaction with the regular channel output:
@@ -1886,6 +1891,7 @@ impl Channel {
        }
 
        /// Returns true if this channel is fully established and not known to be closing.
+       /// Allowed in any state (including after shutdown)
        pub fn is_usable(&self) -> bool {
                let mask = ChannelState::ChannelFunded as u32 | BOTH_SIDES_SHUTDOWN_MASK;
                (self.channel_state & mask) == (ChannelState::ChannelFunded as u32)
@@ -1893,6 +1899,7 @@ impl Channel {
 
        /// Returns true if this channel is currently available for use. This is a superset of
        /// is_usable() and considers things like the channel being temporarily disabled.
+       /// Allowed in any state (including after shutdown)
        pub fn is_live(&self) -> bool {
                self.is_usable()
        }
@@ -2332,14 +2339,39 @@ impl Channel {
        }
 
        /// Gets the latest commitment transaction and any dependant transactions for relay (forcing
-       /// shutdown of this channel - no more calls into this Channel may be made afterwards.
-       pub fn force_shutdown(&mut self) -> Vec<Transaction> {
+       /// shutdown of this channel - no more calls into this Channel may be made afterwards except
+       /// those explicitly stated to be allowed after shutdown completes, eg some simple getters).
+       /// Also returns the list of payment_hashes for channels which we can safely fail backwards
+       /// immediately (others we will have to allow to time out).
+       pub fn force_shutdown(&mut self) -> (Vec<Transaction>, Vec<[u8; 32]>) {
                assert!(self.channel_state != ChannelState::ShutdownComplete as u32);
+
+               // We go ahead and "free" any holding cell HTLCs or HTLCs we haven't yet committed to and
+               // return them to fail the payment.
+               let mut dropped_outbound_htlcs = Vec::with_capacity(self.holding_cell_htlc_updates.len());
+               for htlc_update in self.holding_cell_htlc_updates.drain(..) {
+                       match htlc_update {
+                               HTLCUpdateAwaitingACK::AddHTLC { payment_hash, .. } => {
+                                       dropped_outbound_htlcs.push(payment_hash);
+                               },
+                               _ => {}
+                       }
+               }
+
+               for htlc in self.pending_htlcs.drain(..) {
+                       if htlc.state == HTLCState::LocalAnnounced {
+                               dropped_outbound_htlcs.push(htlc.payment_hash);
+                       }
+                       //TODO: Do something with the remaining HTLCs
+                       //(we need to have the ChannelManager monitor them so we can claim the inbound HTLCs
+                       //which correspond)
+               }
+
                self.channel_state = ChannelState::ShutdownComplete as u32;
                self.channel_update_count += 1;
                let mut res = Vec::new();
                mem::swap(&mut res, &mut self.last_local_commitment_txn);
-               res
+               (res, dropped_outbound_htlcs)
        }
 }
 
index 45abf69604b54deca266b4fd17a3c26593189d92..af2e5e600993df37a2beac3925aa55dac2302ed5 100644 (file)
@@ -365,6 +365,47 @@ impl ChannelManager {
                Ok(())
        }
 
+       #[inline]
+       fn finish_force_close_channel(&self, shutdown_res: (Vec<Transaction>, Vec<[u8; 32]>)) {
+               let (local_txn, failed_htlcs) = shutdown_res;
+               for payment_hash in failed_htlcs {
+                       // unknown_next_peer...I dunno who that is anymore....
+                       self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), &payment_hash, HTLCFailReason::Reason { failure_code: 0x4000 | 10, data: Vec::new() });
+               }
+               for tx in local_txn {
+                       self.tx_broadcaster.broadcast_transaction(&tx);
+               }
+               //TODO: We need to have a way where outbound HTLC claims can result in us claiming the
+               //now-on-chain HTLC output for ourselves (and, thereafter, passing the HTLC backwards).
+               //TODO: We need to handle monitoring of pending offered HTLCs which just hit the chain and
+               //may be claimed, resulting in us claiming the inbound HTLCs (and back-failing after
+               //timeouts are hit and our claims confirm).
+       }
+
+       /// Force closes a channel, immediately broadcasting the latest local commitment transaction to
+       /// the chain and rejecting new HTLCs on the given channel.
+       pub fn force_close_channel(&self, channel_id: &[u8; 32]) {
+               let mut chan = {
+                       let mut channel_state_lock = self.channel_state.lock().unwrap();
+                       let channel_state = channel_state_lock.borrow_parts();
+                       if let Some(mut chan) = channel_state.by_id.remove(channel_id) {
+                               if let Some(short_id) = chan.get_short_channel_id() {
+                                       channel_state.short_to_id.remove(&short_id);
+                               }
+                               chan
+                       } else {
+                               return;
+                       }
+               };
+               self.finish_force_close_channel(chan.force_shutdown());
+               let mut events = self.pending_events.lock().unwrap();
+               if let Ok(update) = self.get_channel_update(&chan) {
+                       events.push(events::Event::BroadcastChannelUpdate {
+                               msg: update
+                       });
+               }
+       }
+
        #[inline]
        fn gen_rho_mu_from_shared_secret(shared_secret: &SharedSecret) -> ([u8; 32], [u8; 32]) {
                ({
@@ -1092,6 +1133,7 @@ impl events::EventsProvider for ChannelManager {
 impl ChainListener for ChannelManager {
        fn block_connected(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], indexes_of_txn_matched: &[u32]) {
                let mut new_events = Vec::new();
+               let mut failed_channels = Vec::new();
                {
                        let mut channel_state = self.channel_state.lock().unwrap();
                        let mut short_to_ids_to_insert = Vec::new();
@@ -1119,7 +1161,10 @@ impl ChainListener for ChannelManager {
                                                                if let Some(short_id) = channel.get_short_channel_id() {
                                                                        short_to_ids_to_remove.push(short_id);
                                                                }
-                                                               channel.force_shutdown();
+                                                               // It looks like our counterparty went on-chain. We go ahead and
+                                                               // broadcast our latest local state as well here, just in case its
+                                                               // some kind of SPV attack, though we expect these to be dropped.
+                                                               failed_channels.push(channel.force_shutdown());
                                                                if let Ok(update) = self.get_channel_update(&channel) {
                                                                        new_events.push(events::Event::BroadcastChannelUpdate {
                                                                                msg: update
@@ -1134,7 +1179,11 @@ impl ChainListener for ChannelManager {
                                        if let Some(short_id) = channel.get_short_channel_id() {
                                                short_to_ids_to_remove.push(short_id);
                                        }
-                                       channel.force_shutdown();
+                                       failed_channels.push(channel.force_shutdown());
+                                       // If would_broadcast_at_height() is true, the channel_monitor will broadcast
+                                       // the latest local tx for us, so we should skip that here (it doesn't really
+                                       // hurt anything, but does make tests a bit simpler).
+                                       failed_channels.last_mut().unwrap().0 = Vec::new();
                                        if let Ok(update) = self.get_channel_update(&channel) {
                                                new_events.push(events::Event::BroadcastChannelUpdate {
                                                        msg: update
@@ -1151,6 +1200,9 @@ impl ChainListener for ChannelManager {
                                channel_state.short_to_id.insert(to_insert.0, to_insert.1);
                        }
                }
+               for failure in failed_channels.drain(..) {
+                       self.finish_force_close_channel(failure);
+               }
                let mut pending_events = self.pending_events.lock().unwrap();
                for funding_locked in new_events.drain(..) {
                        pending_events.push(funding_locked);
@@ -1160,23 +1212,38 @@ impl ChainListener for ChannelManager {
 
        /// We force-close the channel without letting our counterparty participate in the shutdown
        fn block_disconnected(&self, header: &BlockHeader) {
-               let mut channel_lock = self.channel_state.lock().unwrap();
-               let channel_state = channel_lock.borrow_parts();
-               let short_to_id = channel_state.short_to_id;
-               channel_state.by_id.retain(|_,  v| {
-                       if v.block_disconnected(header) {
-                               let tx = v.force_shutdown();
-                               for broadcast_tx in tx {
-                                       self.tx_broadcaster.broadcast_transaction(&broadcast_tx);
-                               }
-                               if let Some(short_id) = v.get_short_channel_id() {
-                                       short_to_id.remove(&short_id);
+               let mut new_events = Vec::new();
+               let mut failed_channels = Vec::new();
+               {
+                       let mut channel_lock = self.channel_state.lock().unwrap();
+                       let channel_state = channel_lock.borrow_parts();
+                       let short_to_id = channel_state.short_to_id;
+                       channel_state.by_id.retain(|_,  v| {
+                               if v.block_disconnected(header) {
+                                       if let Some(short_id) = v.get_short_channel_id() {
+                                               short_to_id.remove(&short_id);
+                                       }
+                                       failed_channels.push(v.force_shutdown());
+                                       if let Ok(update) = self.get_channel_update(&v) {
+                                               new_events.push(events::Event::BroadcastChannelUpdate {
+                                                       msg: update
+                                               });
+                                       }
+                                       false
+                               } else {
+                                       true
                                }
-                               false
-                       } else {
-                               true
+                       });
+               }
+               for failure in failed_channels.drain(..) {
+                       self.finish_force_close_channel(failure);
+               }
+               if !new_events.is_empty() {
+                       let mut pending_events = self.pending_events.lock().unwrap();
+                       for funding_locked in new_events.drain(..) {
+                               pending_events.push(funding_locked);
                        }
-               });
+               }
                self.latest_block_height.fetch_sub(1, Ordering::AcqRel);
        }
 }
@@ -1855,6 +1922,7 @@ impl ChannelMessageHandler for ChannelManager {
 
        fn peer_disconnected(&self, their_node_id: &PublicKey, no_connection_possible: bool) {
                let mut new_events = Vec::new();
+               let mut failed_channels = Vec::new();
                {
                        let mut channel_state_lock = self.channel_state.lock().unwrap();
                        let channel_state = channel_state_lock.borrow_parts();
@@ -1865,10 +1933,7 @@ impl ChannelMessageHandler for ChannelManager {
                                                if let Some(short_id) = chan.get_short_channel_id() {
                                                        short_to_id.remove(&short_id);
                                                }
-                                               let txn_to_broadcast = chan.force_shutdown();
-                                               for tx in txn_to_broadcast {
-                                                       self.tx_broadcaster.broadcast_transaction(&tx);
-                                               }
+                                               failed_channels.push(chan.force_shutdown());
                                                if let Ok(update) = self.get_channel_update(&chan) {
                                                        new_events.push(events::Event::BroadcastChannelUpdate {
                                                                msg: update
@@ -1889,6 +1954,9 @@ impl ChannelMessageHandler for ChannelManager {
                                }
                        }
                }
+               for failure in failed_channels.drain(..) {
+                       self.finish_force_close_channel(failure);
+               }
                if !new_events.is_empty() {
                        let mut pending_events = self.pending_events.lock().unwrap();
                        for event in new_events.drain(..) {
@@ -2880,7 +2948,7 @@ mod tests {
                        let mut node_txn = test_txn_broadcast(&nodes[1], &chan_1, None, HTLCType::NONE);
                        let header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
                        nodes[0].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![node_txn.drain(..).next().unwrap()] }, 1);
-                       assert_eq!(nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().len(), 0);
+                       test_txn_broadcast(&nodes[0], &chan_1, None, HTLCType::NONE);
                }
                get_announce_close_broadcast_events(&nodes, 0, 1);
                assert_eq!(nodes[0].node.list_channels().len(), 0);
@@ -2895,7 +2963,7 @@ mod tests {
                        let mut node_txn = test_txn_broadcast(&nodes[1], &chan_2, None, HTLCType::TIMEOUT);
                        let header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
                        nodes[2].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![node_txn.drain(..).next().unwrap()] }, 1);
-                       assert_eq!(nodes[2].tx_broadcaster.txn_broadcasted.lock().unwrap().len(), 0);
+                       test_txn_broadcast(&nodes[2], &chan_2, None, HTLCType::NONE);
                }
                get_announce_close_broadcast_events(&nodes, 1, 2);
                assert_eq!(nodes[1].node.list_channels().len(), 0);
@@ -2990,14 +3058,15 @@ mod tests {
                        nodes[1].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![revoked_local_txn[0].clone()] }, 1);
                        {
                                let mut node_txn = nodes[1].tx_broadcaster.txn_broadcasted.lock().unwrap();
-                               assert_eq!(node_txn.len(), 1);
+                               assert_eq!(node_txn.len(), 2);
                                assert_eq!(node_txn[0].input.len(), 1);
 
                                let mut funding_tx_map = HashMap::new();
                                funding_tx_map.insert(revoked_local_txn[0].txid(), revoked_local_txn[0].clone());
                                node_txn[0].verify(&funding_tx_map).unwrap();
-                               node_txn.clear();
+                               node_txn.swap_remove(0);
                        }
+                       test_txn_broadcast(&nodes[1], &chan_5, None, HTLCType::NONE);
 
                        nodes[0].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![revoked_local_txn[0].clone()] }, 1);
                        let node_txn = test_txn_broadcast(&nodes[0], &chan_5, Some(revoked_local_txn[0].clone()), HTLCType::TIMEOUT);