Add pending-HTLC-failure to and unify force_shutdown() handling
[rust-lightning] / src / ln / channelmanager.rs
index 45abf69604b54deca266b4fd17a3c26593189d92..af2e5e600993df37a2beac3925aa55dac2302ed5 100644 (file)
@@ -365,6 +365,47 @@ impl ChannelManager {
                Ok(())
        }
 
+       #[inline]
+       fn finish_force_close_channel(&self, shutdown_res: (Vec<Transaction>, Vec<[u8; 32]>)) {
+               let (local_txn, failed_htlcs) = shutdown_res;
+               for payment_hash in failed_htlcs {
+                       // unknown_next_peer...I dunno who that is anymore....
+                       self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), &payment_hash, HTLCFailReason::Reason { failure_code: 0x4000 | 10, data: Vec::new() });
+               }
+               for tx in local_txn {
+                       self.tx_broadcaster.broadcast_transaction(&tx);
+               }
+               //TODO: We need to have a way where outbound HTLC claims can result in us claiming the
+               //now-on-chain HTLC output for ourselves (and, thereafter, passing the HTLC backwards).
+               //TODO: We need to handle monitoring of pending offered HTLCs which just hit the chain and
+               //may be claimed, resulting in us claiming the inbound HTLCs (and back-failing after
+               //timeouts are hit and our claims confirm).
+       }
+
+       /// Force closes a channel, immediately broadcasting the latest local commitment transaction to
+       /// the chain and rejecting new HTLCs on the given channel.
+       pub fn force_close_channel(&self, channel_id: &[u8; 32]) {
+               let mut chan = {
+                       let mut channel_state_lock = self.channel_state.lock().unwrap();
+                       let channel_state = channel_state_lock.borrow_parts();
+                       if let Some(mut chan) = channel_state.by_id.remove(channel_id) {
+                               if let Some(short_id) = chan.get_short_channel_id() {
+                                       channel_state.short_to_id.remove(&short_id);
+                               }
+                               chan
+                       } else {
+                               return;
+                       }
+               };
+               self.finish_force_close_channel(chan.force_shutdown());
+               let mut events = self.pending_events.lock().unwrap();
+               if let Ok(update) = self.get_channel_update(&chan) {
+                       events.push(events::Event::BroadcastChannelUpdate {
+                               msg: update
+                       });
+               }
+       }
+
        #[inline]
        fn gen_rho_mu_from_shared_secret(shared_secret: &SharedSecret) -> ([u8; 32], [u8; 32]) {
                ({
@@ -1092,6 +1133,7 @@ impl events::EventsProvider for ChannelManager {
 impl ChainListener for ChannelManager {
        fn block_connected(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], indexes_of_txn_matched: &[u32]) {
                let mut new_events = Vec::new();
+               let mut failed_channels = Vec::new();
                {
                        let mut channel_state = self.channel_state.lock().unwrap();
                        let mut short_to_ids_to_insert = Vec::new();
@@ -1119,7 +1161,10 @@ impl ChainListener for ChannelManager {
                                                                if let Some(short_id) = channel.get_short_channel_id() {
                                                                        short_to_ids_to_remove.push(short_id);
                                                                }
-                                                               channel.force_shutdown();
+                                                               // It looks like our counterparty went on-chain. We go ahead and
+                                                               // broadcast our latest local state as well here, just in case its
+                                                               // some kind of SPV attack, though we expect these to be dropped.
+                                                               failed_channels.push(channel.force_shutdown());
                                                                if let Ok(update) = self.get_channel_update(&channel) {
                                                                        new_events.push(events::Event::BroadcastChannelUpdate {
                                                                                msg: update
@@ -1134,7 +1179,11 @@ impl ChainListener for ChannelManager {
                                        if let Some(short_id) = channel.get_short_channel_id() {
                                                short_to_ids_to_remove.push(short_id);
                                        }
-                                       channel.force_shutdown();
+                                       failed_channels.push(channel.force_shutdown());
+                                       // If would_broadcast_at_height() is true, the channel_monitor will broadcast
+                                       // the latest local tx for us, so we should skip that here (it doesn't really
+                                       // hurt anything, but does make tests a bit simpler).
+                                       failed_channels.last_mut().unwrap().0 = Vec::new();
                                        if let Ok(update) = self.get_channel_update(&channel) {
                                                new_events.push(events::Event::BroadcastChannelUpdate {
                                                        msg: update
@@ -1151,6 +1200,9 @@ impl ChainListener for ChannelManager {
                                channel_state.short_to_id.insert(to_insert.0, to_insert.1);
                        }
                }
+               for failure in failed_channels.drain(..) {
+                       self.finish_force_close_channel(failure);
+               }
                let mut pending_events = self.pending_events.lock().unwrap();
                for funding_locked in new_events.drain(..) {
                        pending_events.push(funding_locked);
@@ -1160,23 +1212,38 @@ impl ChainListener for ChannelManager {
 
        /// We force-close the channel without letting our counterparty participate in the shutdown
        fn block_disconnected(&self, header: &BlockHeader) {
-               let mut channel_lock = self.channel_state.lock().unwrap();
-               let channel_state = channel_lock.borrow_parts();
-               let short_to_id = channel_state.short_to_id;
-               channel_state.by_id.retain(|_,  v| {
-                       if v.block_disconnected(header) {
-                               let tx = v.force_shutdown();
-                               for broadcast_tx in tx {
-                                       self.tx_broadcaster.broadcast_transaction(&broadcast_tx);
-                               }
-                               if let Some(short_id) = v.get_short_channel_id() {
-                                       short_to_id.remove(&short_id);
+               let mut new_events = Vec::new();
+               let mut failed_channels = Vec::new();
+               {
+                       let mut channel_lock = self.channel_state.lock().unwrap();
+                       let channel_state = channel_lock.borrow_parts();
+                       let short_to_id = channel_state.short_to_id;
+                       channel_state.by_id.retain(|_,  v| {
+                               if v.block_disconnected(header) {
+                                       if let Some(short_id) = v.get_short_channel_id() {
+                                               short_to_id.remove(&short_id);
+                                       }
+                                       failed_channels.push(v.force_shutdown());
+                                       if let Ok(update) = self.get_channel_update(&v) {
+                                               new_events.push(events::Event::BroadcastChannelUpdate {
+                                                       msg: update
+                                               });
+                                       }
+                                       false
+                               } else {
+                                       true
                                }
-                               false
-                       } else {
-                               true
+                       });
+               }
+               for failure in failed_channels.drain(..) {
+                       self.finish_force_close_channel(failure);
+               }
+               if !new_events.is_empty() {
+                       let mut pending_events = self.pending_events.lock().unwrap();
+                       for funding_locked in new_events.drain(..) {
+                               pending_events.push(funding_locked);
                        }
-               });
+               }
                self.latest_block_height.fetch_sub(1, Ordering::AcqRel);
        }
 }
@@ -1855,6 +1922,7 @@ impl ChannelMessageHandler for ChannelManager {
 
        fn peer_disconnected(&self, their_node_id: &PublicKey, no_connection_possible: bool) {
                let mut new_events = Vec::new();
+               let mut failed_channels = Vec::new();
                {
                        let mut channel_state_lock = self.channel_state.lock().unwrap();
                        let channel_state = channel_state_lock.borrow_parts();
@@ -1865,10 +1933,7 @@ impl ChannelMessageHandler for ChannelManager {
                                                if let Some(short_id) = chan.get_short_channel_id() {
                                                        short_to_id.remove(&short_id);
                                                }
-                                               let txn_to_broadcast = chan.force_shutdown();
-                                               for tx in txn_to_broadcast {
-                                                       self.tx_broadcaster.broadcast_transaction(&tx);
-                                               }
+                                               failed_channels.push(chan.force_shutdown());
                                                if let Ok(update) = self.get_channel_update(&chan) {
                                                        new_events.push(events::Event::BroadcastChannelUpdate {
                                                                msg: update
@@ -1889,6 +1954,9 @@ impl ChannelMessageHandler for ChannelManager {
                                }
                        }
                }
+               for failure in failed_channels.drain(..) {
+                       self.finish_force_close_channel(failure);
+               }
                if !new_events.is_empty() {
                        let mut pending_events = self.pending_events.lock().unwrap();
                        for event in new_events.drain(..) {
@@ -2880,7 +2948,7 @@ mod tests {
                        let mut node_txn = test_txn_broadcast(&nodes[1], &chan_1, None, HTLCType::NONE);
                        let header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
                        nodes[0].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![node_txn.drain(..).next().unwrap()] }, 1);
-                       assert_eq!(nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().len(), 0);
+                       test_txn_broadcast(&nodes[0], &chan_1, None, HTLCType::NONE);
                }
                get_announce_close_broadcast_events(&nodes, 0, 1);
                assert_eq!(nodes[0].node.list_channels().len(), 0);
@@ -2895,7 +2963,7 @@ mod tests {
                        let mut node_txn = test_txn_broadcast(&nodes[1], &chan_2, None, HTLCType::TIMEOUT);
                        let header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
                        nodes[2].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![node_txn.drain(..).next().unwrap()] }, 1);
-                       assert_eq!(nodes[2].tx_broadcaster.txn_broadcasted.lock().unwrap().len(), 0);
+                       test_txn_broadcast(&nodes[2], &chan_2, None, HTLCType::NONE);
                }
                get_announce_close_broadcast_events(&nodes, 1, 2);
                assert_eq!(nodes[1].node.list_channels().len(), 0);
@@ -2990,14 +3058,15 @@ mod tests {
                        nodes[1].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![revoked_local_txn[0].clone()] }, 1);
                        {
                                let mut node_txn = nodes[1].tx_broadcaster.txn_broadcasted.lock().unwrap();
-                               assert_eq!(node_txn.len(), 1);
+                               assert_eq!(node_txn.len(), 2);
                                assert_eq!(node_txn[0].input.len(), 1);
 
                                let mut funding_tx_map = HashMap::new();
                                funding_tx_map.insert(revoked_local_txn[0].txid(), revoked_local_txn[0].clone());
                                node_txn[0].verify(&funding_tx_map).unwrap();
-                               node_txn.clear();
+                               node_txn.swap_remove(0);
                        }
+                       test_txn_broadcast(&nodes[1], &chan_5, None, HTLCType::NONE);
 
                        nodes[0].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![revoked_local_txn[0].clone()] }, 1);
                        let node_txn = test_txn_broadcast(&nodes[0], &chan_5, Some(revoked_local_txn[0].clone()), HTLCType::TIMEOUT);