Merge pull request #108 from TheBlueMatt/2018-08-fuzz-fixes
[rust-lightning] / src / ln / channelmanager.rs
index 45abf69604b54deca266b4fd17a3c26593189d92..29d260ba6886c8826ff4510f8215a3c097e27d2e 100644 (file)
@@ -19,13 +19,13 @@ use ln::msgs;
 use ln::msgs::{HandleError,ChannelMessageHandler,MsgEncodable,MsgDecodable};
 use util::{byte_utils, events, internal_traits, rng};
 use util::sha2::Sha256;
+use util::chacha20poly1305rfc::ChaCha20;
 
 use crypto;
 use crypto::mac::{Mac,MacResult};
 use crypto::hmac::Hmac;
 use crypto::digest::Digest;
 use crypto::symmetriccipher::SynchronousStreamCipher;
-use crypto::chacha20::ChaCha20;
 
 use std::{ptr, mem};
 use std::collections::HashMap;
@@ -137,13 +137,15 @@ impl ChannelHolder {
                        by_id: &mut self.by_id,
                        short_to_id: &mut self.short_to_id,
                        next_forward: &mut self.next_forward,
-                       /// short channel id -> forward infos. Key of 0 means payments received
                        forward_htlcs: &mut self.forward_htlcs,
                        claimable_htlcs: &mut self.claimable_htlcs,
                }
        }
 }
 
+#[cfg(not(any(target_pointer_width = "32", target_pointer_width = "64")))]
+const ERR: () = "You need at least 32 bit pointers (well, usize, but we'll assume they're the same) for ChannelManager::latest_block_height";
+
 /// Manager which keeps track of a number of channels and sends messages to the appropriate
 /// channel, also tracking HTLC preimages and forwarding onion packets appropriately.
 /// Implements ChannelMessageHandler, handling the multi-channel parts and passing things through
@@ -157,7 +159,7 @@ pub struct ChannelManager {
 
        announce_channels_publicly: bool,
        fee_proportional_millionths: u32,
-       latest_block_height: AtomicUsize, //TODO: Compile-time assert this is at least 32-bits long
+       latest_block_height: AtomicUsize,
        secp_ctx: Secp256k1,
 
        channel_state: Mutex<ChannelHolder>,
@@ -365,6 +367,47 @@ impl ChannelManager {
                Ok(())
        }
 
+       #[inline]
+       fn finish_force_close_channel(&self, shutdown_res: (Vec<Transaction>, Vec<[u8; 32]>)) {
+               let (local_txn, failed_htlcs) = shutdown_res;
+               for payment_hash in failed_htlcs {
+                       // unknown_next_peer...I dunno who that is anymore....
+                       self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), &payment_hash, HTLCFailReason::Reason { failure_code: 0x4000 | 10, data: Vec::new() });
+               }
+               for tx in local_txn {
+                       self.tx_broadcaster.broadcast_transaction(&tx);
+               }
+               //TODO: We need to have a way where outbound HTLC claims can result in us claiming the
+               //now-on-chain HTLC output for ourselves (and, thereafter, passing the HTLC backwards).
+               //TODO: We need to handle monitoring of pending offered HTLCs which just hit the chain and
+               //may be claimed, resulting in us claiming the inbound HTLCs (and back-failing after
+               //timeouts are hit and our claims confirm).
+       }
+
+       /// Force closes a channel, immediately broadcasting the latest local commitment transaction to
+       /// the chain and rejecting new HTLCs on the given channel.
+       pub fn force_close_channel(&self, channel_id: &[u8; 32]) {
+               let mut chan = {
+                       let mut channel_state_lock = self.channel_state.lock().unwrap();
+                       let channel_state = channel_state_lock.borrow_parts();
+                       if let Some(chan) = channel_state.by_id.remove(channel_id) {
+                               if let Some(short_id) = chan.get_short_channel_id() {
+                                       channel_state.short_to_id.remove(&short_id);
+                               }
+                               chan
+                       } else {
+                               return;
+                       }
+               };
+               self.finish_force_close_channel(chan.force_shutdown());
+               let mut events = self.pending_events.lock().unwrap();
+               if let Ok(update) = self.get_channel_update(&chan) {
+                       events.push(events::Event::BroadcastChannelUpdate {
+                               msg: update
+                       });
+               }
+       }
+
        #[inline]
        fn gen_rho_mu_from_shared_secret(shared_secret: &SharedSecret) -> ([u8; 32], [u8; 32]) {
                ({
@@ -1092,12 +1135,14 @@ impl events::EventsProvider for ChannelManager {
 impl ChainListener for ChannelManager {
        fn block_connected(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], indexes_of_txn_matched: &[u32]) {
                let mut new_events = Vec::new();
+               let mut failed_channels = Vec::new();
                {
-                       let mut channel_state = self.channel_state.lock().unwrap();
-                       let mut short_to_ids_to_insert = Vec::new();
-                       let mut short_to_ids_to_remove = Vec::new();
+                       let mut channel_lock = self.channel_state.lock().unwrap();
+                       let channel_state = channel_lock.borrow_parts();
+                       let short_to_id = channel_state.short_to_id;
                        channel_state.by_id.retain(|_, channel| {
-                               if let Some(funding_locked) = channel.block_connected(header, height, txn_matched, indexes_of_txn_matched) {
+                               let chan_res = channel.block_connected(header, height, txn_matched, indexes_of_txn_matched);
+                               if let Ok(Some(funding_locked)) = chan_res {
                                        let announcement_sigs = match self.get_announcement_sigs(channel) {
                                                Ok(res) => res,
                                                Err(_e) => {
@@ -1110,16 +1155,29 @@ impl ChainListener for ChannelManager {
                                                msg: funding_locked,
                                                announcement_sigs: announcement_sigs
                                        });
-                                       short_to_ids_to_insert.push((channel.get_short_channel_id().unwrap(), channel.channel_id()));
+                                       short_to_id.insert(channel.get_short_channel_id().unwrap(), channel.channel_id());
+                               } else if let Err(e) = chan_res {
+                                       if let Some(msgs::ErrorAction::DisconnectPeer { msg }) = e.action {
+                                               new_events.push(events::Event::DisconnectPeer {
+                                                       node_id: channel.get_their_node_id(),
+                                                       msg: msg
+                                               });
+                                       } else { unreachable!(); }
+                                       if channel.is_shutdown() {
+                                               return false;
+                                       }
                                }
                                if let Some(funding_txo) = channel.get_funding_txo() {
                                        for tx in txn_matched {
                                                for inp in tx.input.iter() {
                                                        if inp.prev_hash == funding_txo.txid && inp.prev_index == funding_txo.index as u32 {
                                                                if let Some(short_id) = channel.get_short_channel_id() {
-                                                                       short_to_ids_to_remove.push(short_id);
+                                                                       short_to_id.remove(&short_id);
                                                                }
-                                                               channel.force_shutdown();
+                                                               // It looks like our counterparty went on-chain. We go ahead and
+                                                               // broadcast our latest local state as well here, just in case its
+                                                               // some kind of SPV attack, though we expect these to be dropped.
+                                                               failed_channels.push(channel.force_shutdown());
                                                                if let Ok(update) = self.get_channel_update(&channel) {
                                                                        new_events.push(events::Event::BroadcastChannelUpdate {
                                                                                msg: update
@@ -1130,11 +1188,15 @@ impl ChainListener for ChannelManager {
                                                }
                                        }
                                }
-                               if channel.channel_monitor().would_broadcast_at_height(height) {
+                               if channel.is_funding_initiated() && channel.channel_monitor().would_broadcast_at_height(height) {
                                        if let Some(short_id) = channel.get_short_channel_id() {
-                                               short_to_ids_to_remove.push(short_id);
+                                               short_to_id.remove(&short_id);
                                        }
-                                       channel.force_shutdown();
+                                       failed_channels.push(channel.force_shutdown());
+                                       // If would_broadcast_at_height() is true, the channel_monitor will broadcast
+                                       // the latest local tx for us, so we should skip that here (it doesn't really
+                                       // hurt anything, but does make tests a bit simpler).
+                                       failed_channels.last_mut().unwrap().0 = Vec::new();
                                        if let Ok(update) = self.get_channel_update(&channel) {
                                                new_events.push(events::Event::BroadcastChannelUpdate {
                                                        msg: update
@@ -1144,12 +1206,9 @@ impl ChainListener for ChannelManager {
                                }
                                true
                        });
-                       for to_remove in short_to_ids_to_remove {
-                               channel_state.short_to_id.remove(&to_remove);
-                       }
-                       for to_insert in short_to_ids_to_insert {
-                               channel_state.short_to_id.insert(to_insert.0, to_insert.1);
-                       }
+               }
+               for failure in failed_channels.drain(..) {
+                       self.finish_force_close_channel(failure);
                }
                let mut pending_events = self.pending_events.lock().unwrap();
                for funding_locked in new_events.drain(..) {
@@ -1160,23 +1219,38 @@ impl ChainListener for ChannelManager {
 
        /// We force-close the channel without letting our counterparty participate in the shutdown
        fn block_disconnected(&self, header: &BlockHeader) {
-               let mut channel_lock = self.channel_state.lock().unwrap();
-               let channel_state = channel_lock.borrow_parts();
-               let short_to_id = channel_state.short_to_id;
-               channel_state.by_id.retain(|_,  v| {
-                       if v.block_disconnected(header) {
-                               let tx = v.force_shutdown();
-                               for broadcast_tx in tx {
-                                       self.tx_broadcaster.broadcast_transaction(&broadcast_tx);
-                               }
-                               if let Some(short_id) = v.get_short_channel_id() {
-                                       short_to_id.remove(&short_id);
+               let mut new_events = Vec::new();
+               let mut failed_channels = Vec::new();
+               {
+                       let mut channel_lock = self.channel_state.lock().unwrap();
+                       let channel_state = channel_lock.borrow_parts();
+                       let short_to_id = channel_state.short_to_id;
+                       channel_state.by_id.retain(|_,  v| {
+                               if v.block_disconnected(header) {
+                                       if let Some(short_id) = v.get_short_channel_id() {
+                                               short_to_id.remove(&short_id);
+                                       }
+                                       failed_channels.push(v.force_shutdown());
+                                       if let Ok(update) = self.get_channel_update(&v) {
+                                               new_events.push(events::Event::BroadcastChannelUpdate {
+                                                       msg: update
+                                               });
+                                       }
+                                       false
+                               } else {
+                                       true
                                }
-                               false
-                       } else {
-                               true
+                       });
+               }
+               for failure in failed_channels.drain(..) {
+                       self.finish_force_close_channel(failure);
+               }
+               if !new_events.is_empty() {
+                       let mut pending_events = self.pending_events.lock().unwrap();
+                       for funding_locked in new_events.drain(..) {
+                               pending_events.push(funding_locked);
                        }
-               });
+               }
                self.latest_block_height.fetch_sub(1, Ordering::AcqRel);
        }
 }
@@ -1194,13 +1268,13 @@ impl ChannelMessageHandler for ChannelManager {
 
                let chan_keys = if cfg!(feature = "fuzztarget") {
                        ChannelKeys {
-                               funding_key:               SecretKey::from_slice(&self.secp_ctx, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]).unwrap(),
-                               revocation_base_key:       SecretKey::from_slice(&self.secp_ctx, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]).unwrap(),
-                               payment_base_key:          SecretKey::from_slice(&self.secp_ctx, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]).unwrap(),
-                               delayed_payment_base_key:  SecretKey::from_slice(&self.secp_ctx, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]).unwrap(),
-                               htlc_base_key:             SecretKey::from_slice(&self.secp_ctx, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]).unwrap(),
-                               channel_close_key:         SecretKey::from_slice(&self.secp_ctx, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]).unwrap(),
-                               channel_monitor_claim_key: SecretKey::from_slice(&self.secp_ctx, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]).unwrap(),
+                               funding_key:               SecretKey::from_slice(&self.secp_ctx, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0]).unwrap(),
+                               revocation_base_key:       SecretKey::from_slice(&self.secp_ctx, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0]).unwrap(),
+                               payment_base_key:          SecretKey::from_slice(&self.secp_ctx, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 3, 0]).unwrap(),
+                               delayed_payment_base_key:  SecretKey::from_slice(&self.secp_ctx, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 4, 0]).unwrap(),
+                               htlc_base_key:             SecretKey::from_slice(&self.secp_ctx, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 5, 0]).unwrap(),
+                               channel_close_key:         SecretKey::from_slice(&self.secp_ctx, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 6, 0]).unwrap(),
+                               channel_monitor_claim_key: SecretKey::from_slice(&self.secp_ctx, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 0]).unwrap(),
                                commitment_seed: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
                        }
                } else {
@@ -1561,20 +1635,14 @@ impl ChannelMessageHandler for ChannelManager {
                // destination. That's OK since those nodes are probably busted or trying to do network
                // mapping through repeated loops. In either case, we want them to stop talking to us, so
                // we send permanent_node_failure.
-               match &claimable_htlcs_entry {
-                       &hash_map::Entry::Occupied(ref e) => {
-                               let mut acceptable_cycle = false;
-                               match e.get() {
-                                       &PendingOutboundHTLC::OutboundRoute { .. } => {
-                                               acceptable_cycle = pending_forward_info.short_channel_id == 0;
-                                       },
-                                       _ => {},
-                               }
-                               if !acceptable_cycle {
-                                       return_err!("Payment looped through us twice", 0x4000 | 0x2000 | 2, &[0;0]);
-                               }
-                       },
-                       _ => {},
+               if let &hash_map::Entry::Occupied(ref e) = &claimable_htlcs_entry {
+                       let mut acceptable_cycle = false;
+                       if let &PendingOutboundHTLC::OutboundRoute { .. } = e.get() {
+                               acceptable_cycle = pending_forward_info.short_channel_id == 0;
+                       }
+                       if !acceptable_cycle {
+                               return_err!("Payment looped through us twice", 0x4000 | 0x2000 | 2, &[0;0]);
+                       }
                }
 
                let (source_short_channel_id, res) = match channel_state.by_id.get_mut(&msg.channel_id) {
@@ -1625,22 +1693,16 @@ impl ChannelMessageHandler for ChannelManager {
                // is broken, we may have enough info to get our own money!
                self.claim_funds_internal(msg.payment_preimage.clone(), false);
 
-               let monitor = {
-                       let mut channel_state = self.channel_state.lock().unwrap();
-                       match channel_state.by_id.get_mut(&msg.channel_id) {
-                               Some(chan) => {
-                                       if chan.get_their_node_id() != *their_node_id {
-                                               return Err(HandleError{err: "Got a message for a channel from the wrong node!", action: None})
-                                       }
-                                       chan.update_fulfill_htlc(&msg)?
-                               },
-                               None => return Err(HandleError{err: "Failed to find corresponding channel", action: None})
-                       }
-               };
-               if let Err(_e) = self.monitor.add_update_monitor(monitor.get_funding_txo().unwrap(), monitor) {
-                       unimplemented!();
+               let mut channel_state = self.channel_state.lock().unwrap();
+               match channel_state.by_id.get_mut(&msg.channel_id) {
+                       Some(chan) => {
+                               if chan.get_their_node_id() != *their_node_id {
+                                       return Err(HandleError{err: "Got a message for a channel from the wrong node!", action: None})
+                               }
+                               chan.update_fulfill_htlc(&msg)
+                       },
+                       None => return Err(HandleError{err: "Failed to find corresponding channel", action: None})
                }
-               Ok(())
        }
 
        fn handle_update_fail_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFailHTLC) -> Result<Option<msgs::HTLCFailChannelUpdate>, HandleError> {
@@ -1855,6 +1917,7 @@ impl ChannelMessageHandler for ChannelManager {
 
        fn peer_disconnected(&self, their_node_id: &PublicKey, no_connection_possible: bool) {
                let mut new_events = Vec::new();
+               let mut failed_channels = Vec::new();
                {
                        let mut channel_state_lock = self.channel_state.lock().unwrap();
                        let channel_state = channel_state_lock.borrow_parts();
@@ -1865,10 +1928,7 @@ impl ChannelMessageHandler for ChannelManager {
                                                if let Some(short_id) = chan.get_short_channel_id() {
                                                        short_to_id.remove(&short_id);
                                                }
-                                               let txn_to_broadcast = chan.force_shutdown();
-                                               for tx in txn_to_broadcast {
-                                                       self.tx_broadcaster.broadcast_transaction(&tx);
-                                               }
+                                               failed_channels.push(chan.force_shutdown());
                                                if let Ok(update) = self.get_channel_update(&chan) {
                                                        new_events.push(events::Event::BroadcastChannelUpdate {
                                                                msg: update
@@ -1889,6 +1949,9 @@ impl ChannelMessageHandler for ChannelManager {
                                }
                        }
                }
+               for failure in failed_channels.drain(..) {
+                       self.finish_force_close_channel(failure);
+               }
                if !new_events.is_empty() {
                        let mut pending_events = self.pending_events.lock().unwrap();
                        for event in new_events.drain(..) {
@@ -2434,10 +2497,9 @@ mod tests {
                                        {
                                                let mut added_monitors = $node.chan_monitor.added_monitors.lock().unwrap();
                                                if $last_node {
-                                                       assert_eq!(added_monitors.len(), 1);
+                                                       assert_eq!(added_monitors.len(), 0);
                                                } else {
-                                                       assert_eq!(added_monitors.len(), 2);
-                                                       assert!(added_monitors[0].0 != added_monitors[1].0);
+                                                       assert_eq!(added_monitors.len(), 1);
                                                }
                                                added_monitors.clear();
                                        }
@@ -2880,7 +2942,7 @@ mod tests {
                        let mut node_txn = test_txn_broadcast(&nodes[1], &chan_1, None, HTLCType::NONE);
                        let header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
                        nodes[0].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![node_txn.drain(..).next().unwrap()] }, 1);
-                       assert_eq!(nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().len(), 0);
+                       test_txn_broadcast(&nodes[0], &chan_1, None, HTLCType::NONE);
                }
                get_announce_close_broadcast_events(&nodes, 0, 1);
                assert_eq!(nodes[0].node.list_channels().len(), 0);
@@ -2895,7 +2957,7 @@ mod tests {
                        let mut node_txn = test_txn_broadcast(&nodes[1], &chan_2, None, HTLCType::TIMEOUT);
                        let header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
                        nodes[2].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![node_txn.drain(..).next().unwrap()] }, 1);
-                       assert_eq!(nodes[2].tx_broadcaster.txn_broadcasted.lock().unwrap().len(), 0);
+                       test_txn_broadcast(&nodes[2], &chan_2, None, HTLCType::NONE);
                }
                get_announce_close_broadcast_events(&nodes, 1, 2);
                assert_eq!(nodes[1].node.list_channels().len(), 0);
@@ -2990,14 +3052,15 @@ mod tests {
                        nodes[1].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![revoked_local_txn[0].clone()] }, 1);
                        {
                                let mut node_txn = nodes[1].tx_broadcaster.txn_broadcasted.lock().unwrap();
-                               assert_eq!(node_txn.len(), 1);
+                               assert_eq!(node_txn.len(), 2);
                                assert_eq!(node_txn[0].input.len(), 1);
 
                                let mut funding_tx_map = HashMap::new();
                                funding_tx_map.insert(revoked_local_txn[0].txid(), revoked_local_txn[0].clone());
                                node_txn[0].verify(&funding_tx_map).unwrap();
-                               node_txn.clear();
+                               node_txn.swap_remove(0);
                        }
+                       test_txn_broadcast(&nodes[1], &chan_5, None, HTLCType::NONE);
 
                        nodes[0].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![revoked_local_txn[0].clone()] }, 1);
                        let node_txn = test_txn_broadcast(&nodes[0], &chan_5, Some(revoked_local_txn[0].clone()), HTLCType::TIMEOUT);