Drop MsgEncodable in favor of default fns on Writeable
[rust-lightning] / src / ln / channelmanager.rs
index a2b3b242b8aa1e47119afbed5dabeb37beb33f13..fbf4b1128997532257187d170b5ccd3ae1050d32 100644 (file)
@@ -16,9 +16,10 @@ use ln::channel::{Channel, ChannelKeys};
 use ln::channelmonitor::ManyChannelMonitor;
 use ln::router::{Route,RouteHop};
 use ln::msgs;
-use ln::msgs::{HandleError,ChannelMessageHandler,MsgEncodable,MsgDecodable};
+use ln::msgs::{HandleError,ChannelMessageHandler};
 use util::{byte_utils, events, internal_traits, rng};
 use util::sha2::Sha256;
+use util::ser::{Readable, Writeable};
 use util::chacha20poly1305rfc::ChaCha20;
 use util::logger::Logger;
 use util::errors::APIError;
@@ -32,6 +33,7 @@ use crypto::symmetriccipher::SynchronousStreamCipher;
 use std::{ptr, mem};
 use std::collections::HashMap;
 use std::collections::hash_map;
+use std::io::Cursor;
 use std::sync::{Mutex,MutexGuard,Arc};
 use std::sync::atomic::{AtomicUsize, Ordering};
 use std::time::{Instant,Duration};
@@ -369,7 +371,7 @@ impl ChannelManager {
                };
 
                let channel = Channel::new_outbound(&*self.fee_estimator, chan_keys, their_network_key, channel_value_satoshis, push_msat, self.announce_channels_publicly, user_id, Arc::clone(&self.logger))?;
-               let res = channel.get_open_channel(self.genesis_hash.clone(), &*self.fee_estimator)?;
+               let res = channel.get_open_channel(self.genesis_hash.clone(), &*self.fee_estimator);
                let mut channel_state = self.channel_state.lock().unwrap();
                match channel_state.by_id.insert(channel.channel_id(), channel) {
                        Some(_) => panic!("RNG is bad???"),
@@ -819,7 +821,7 @@ impl ChannelManager {
                let next_hop_data = {
                        let mut decoded = [0; 65];
                        chacha.process(&msg.onion_routing_packet.hop_data[0..65], &mut decoded);
-                       match msgs::OnionHopData::decode(&decoded[..]) {
+                       match msgs::OnionHopData::read(&mut Cursor::new(&decoded[..])) {
                                Err(err) => {
                                        let error_code = match err {
                                                msgs::DecodeError::UnknownRealmByte => 0x4000 | 1,
@@ -1218,7 +1220,7 @@ impl ChannelManager {
                                                };
                                                match channel_state.claimable_htlcs.entry(forward_info.payment_hash) {
                                                        hash_map::Entry::Occupied(mut entry) => entry.get_mut().push(prev_hop_data),
-                                                       hash_map::Entry::Vacant(mut entry) => { entry.insert(vec![prev_hop_data]); },
+                                                       hash_map::Entry::Vacant(entry) => { entry.insert(vec![prev_hop_data]); },
                                                };
                                                new_events.push((None, events::Event::PaymentReceived {
                                                        payment_hash: forward_info.payment_hash,
@@ -1717,7 +1719,7 @@ impl ChannelManager {
                                        chacha.process(&packet_decrypted, &mut decryption_tmp[..]);
                                        packet_decrypted = decryption_tmp;
 
-                                       if let Ok(err_packet) = msgs::DecodedOnionErrorPacket::decode(&packet_decrypted) {
+                                       if let Ok(err_packet) = msgs::DecodedOnionErrorPacket::read(&mut Cursor::new(&packet_decrypted)) {
                                                if err_packet.failuremsg.len() >= 2 {
                                                        let um = ChannelManager::gen_um_from_shared_secret(&shared_secret);
 
@@ -1733,7 +1735,7 @@ impl ChannelManager {
                                                                                if err_packet.failuremsg.len() >= 4 {
                                                                                        let update_len = byte_utils::slice_to_be16(&err_packet.failuremsg[2..4]) as usize;
                                                                                        if err_packet.failuremsg.len() >= 4 + update_len {
-                                                                                               if let Ok(chan_update) = msgs::ChannelUpdate::decode(&err_packet.failuremsg[4..4 + update_len]) {
+                                                                                               if let Ok(chan_update) = msgs::ChannelUpdate::read(&mut Cursor::new(&err_packet.failuremsg[4..4 + update_len])) {
                                                                                                        res = Some(msgs::HTLCFailChannelUpdate::ChannelUpdateMessage {
                                                                                                                msg: chan_update,
                                                                                                        });
@@ -1902,7 +1904,27 @@ impl ChannelManager {
                Ok(())
        }
 
-
+       fn internal_channel_reestablish(&self, their_node_id: &PublicKey, msg: &msgs::ChannelReestablish) -> Result<(Option<msgs::FundingLocked>, Option<msgs::RevokeAndACK>, Option<msgs::CommitmentUpdate>), MsgHandleErrInternal> {
+               let (res, chan_monitor) = {
+                       let mut channel_state = self.channel_state.lock().unwrap();
+                       match channel_state.by_id.get_mut(&msg.channel_id) {
+                               Some(chan) => {
+                                       if chan.get_their_node_id() != *their_node_id {
+                                               return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!", msg.channel_id));
+                                       }
+                                       let (funding_locked, revoke_and_ack, commitment_update, channel_monitor) = chan.channel_reestablish(msg).map_err(|e| MsgHandleErrInternal::from_maybe_close(e))?;
+                                       (Ok((funding_locked, revoke_and_ack, commitment_update)), channel_monitor)
+                               },
+                               None => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel", msg.channel_id))
+                       }
+               };
+               if let Some(monitor) = chan_monitor {
+                       if let Err(_e) = self.monitor.add_update_monitor(monitor.get_funding_txo().unwrap(), monitor) {
+                               unimplemented!();
+                       }
+               }
+               res
+       }
 }
 
 impl events::EventsProvider for ChannelManager {
@@ -2123,9 +2145,14 @@ impl ChannelMessageHandler for ChannelManager {
                handle_error!(self, self.internal_announcement_signatures(their_node_id, msg), their_node_id)
        }
 
+       fn handle_channel_reestablish(&self, their_node_id: &PublicKey, msg: &msgs::ChannelReestablish) -> Result<(Option<msgs::FundingLocked>, Option<msgs::RevokeAndACK>, Option<msgs::CommitmentUpdate>), HandleError> {
+               handle_error!(self, self.internal_channel_reestablish(their_node_id, msg), their_node_id)
+       }
+
        fn peer_disconnected(&self, their_node_id: &PublicKey, no_connection_possible: bool) {
                let mut new_events = Vec::new();
                let mut failed_channels = Vec::new();
+               let mut failed_payments = Vec::new();
                {
                        let mut channel_state_lock = self.channel_state.lock().unwrap();
                        let channel_state = channel_state_lock.borrow_parts();
@@ -2148,13 +2175,23 @@ impl ChannelMessageHandler for ChannelManager {
                                        }
                                });
                        } else {
-                               for chan in channel_state.by_id {
-                                       if chan.1.get_their_node_id() == *their_node_id {
-                                               //TODO: mark channel disabled (and maybe announce such after a timeout). Also
-                                               //fail and wipe any uncommitted outbound HTLCs as those are considered after
-                                               //reconnect.
+                               channel_state.by_id.retain(|_, chan| {
+                                       if chan.get_their_node_id() == *their_node_id {
+                                               //TODO: mark channel disabled (and maybe announce such after a timeout).
+                                               let failed_adds = chan.remove_uncommitted_htlcs_and_mark_paused();
+                                               if !failed_adds.is_empty() {
+                                                       let chan_update = self.get_channel_update(&chan).map(|u| u.encode_with_len()).unwrap(); // Cannot add/recv HTLCs before we have a short_id so unwrap is safe
+                                                       failed_payments.push((chan_update, failed_adds));
+                                               }
+                                               if chan.is_shutdown() {
+                                                       if let Some(short_id) = chan.get_short_channel_id() {
+                                                               short_to_id.remove(&short_id);
+                                                       }
+                                                       return false;
+                                               }
                                        }
-                               }
+                                       true
+                               })
                        }
                }
                for failure in failed_channels.drain(..) {
@@ -2166,6 +2203,32 @@ impl ChannelMessageHandler for ChannelManager {
                                pending_events.push(event);
                        }
                }
+               for (chan_update, mut htlc_sources) in failed_payments {
+                       for (htlc_source, payment_hash) in htlc_sources.drain(..) {
+                               self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_source, &payment_hash, HTLCFailReason::Reason { failure_code: 0x1000 | 7, data: chan_update.clone() });
+                       }
+               }
+       }
+
+       fn peer_connected(&self, their_node_id: &PublicKey) -> Vec<msgs::ChannelReestablish> {
+               let mut res = Vec::new();
+               let mut channel_state = self.channel_state.lock().unwrap();
+               channel_state.by_id.retain(|_, chan| {
+                       if chan.get_their_node_id() == *their_node_id {
+                               if !chan.have_received_message() {
+                                       // If we created this (outbound) channel while we were disconnected from the
+                                       // peer we probably failed to send the open_channel message, which is now
+                                       // lost. We can't have had anything pending related to this channel, so we just
+                                       // drop it.
+                                       false
+                               } else {
+                                       res.push(chan.get_channel_reestablish());
+                                       true
+                               }
+                       } else { true }
+               });
+               //TODO: Also re-broadcast announcement_signatures
+               res
        }
 
        fn handle_error(&self, their_node_id: &PublicKey, msg: &msgs::ErrorMessage) {
@@ -2189,11 +2252,12 @@ mod tests {
        use ln::channelmanager::{ChannelManager,OnionKeys};
        use ln::router::{Route, RouteHop, Router};
        use ln::msgs;
-       use ln::msgs::{MsgEncodable,ChannelMessageHandler,RoutingMessageHandler};
+       use ln::msgs::{ChannelMessageHandler,RoutingMessageHandler};
        use util::test_utils;
        use util::events::{Event, EventsProvider};
-       use util::logger::Logger;
        use util::errors::APIError;
+       use util::logger::Logger;
+       use util::ser::Writeable;
 
        use bitcoin::util::hash::Sha256dHash;
        use bitcoin::blockdata::block::{Block, BlockHeader};
@@ -2390,6 +2454,13 @@ mod tests {
                network_payment_count: Rc<RefCell<u8>>,
                network_chan_count: Rc<RefCell<u32>>,
        }
+       impl Drop for Node {
+               fn drop(&mut self) {
+                       // Check that we processed all pending events
+                       assert_eq!(self.node.get_and_clear_pending_events().len(), 0);
+                       assert_eq!(self.chan_monitor.added_monitors.lock().unwrap().len(), 0);
+               }
+       }
 
        fn create_chan_between_nodes(node_a: &Node, node_b: &Node) -> (msgs::ChannelAnnouncement, msgs::ChannelUpdate, msgs::ChannelUpdate, [u8; 32], Transaction) {
                node_a.node.create_channel(node_b.node.get_our_node_id(), 100000, 10001, 42).unwrap();
@@ -2614,6 +2685,54 @@ mod tests {
                }
        }
 
+       macro_rules! commitment_signed_dance {
+               ($node_a: expr, $node_b: expr, $commitment_signed: expr, $fail_backwards: expr) => {
+                       {
+                               {
+                                       let added_monitors = $node_a.chan_monitor.added_monitors.lock().unwrap();
+                                       assert!(added_monitors.is_empty());
+                               }
+                               let (as_revoke_and_ack, as_commitment_signed) = $node_a.node.handle_commitment_signed(&$node_b.node.get_our_node_id(), &$commitment_signed).unwrap();
+                               {
+                                       let mut added_monitors = $node_a.chan_monitor.added_monitors.lock().unwrap();
+                                       assert_eq!(added_monitors.len(), 1);
+                                       added_monitors.clear();
+                               }
+                               {
+                                       let added_monitors = $node_b.chan_monitor.added_monitors.lock().unwrap();
+                                       assert!(added_monitors.is_empty());
+                               }
+                               assert!($node_b.node.handle_revoke_and_ack(&$node_a.node.get_our_node_id(), &as_revoke_and_ack).unwrap().is_none());
+                               {
+                                       let mut added_monitors = $node_b.chan_monitor.added_monitors.lock().unwrap();
+                                       assert_eq!(added_monitors.len(), 1);
+                                       added_monitors.clear();
+                               }
+                               let (bs_revoke_and_ack, bs_none) = $node_b.node.handle_commitment_signed(&$node_a.node.get_our_node_id(), &as_commitment_signed.unwrap()).unwrap();
+                               assert!(bs_none.is_none());
+                               {
+                                       let mut added_monitors = $node_b.chan_monitor.added_monitors.lock().unwrap();
+                                       assert_eq!(added_monitors.len(), 1);
+                                       added_monitors.clear();
+                               }
+                               if $fail_backwards {
+                                       assert!($node_a.node.get_and_clear_pending_events().is_empty());
+                               }
+                               assert!($node_a.node.handle_revoke_and_ack(&$node_b.node.get_our_node_id(), &bs_revoke_and_ack).unwrap().is_none());
+                               {
+                                       let mut added_monitors = $node_a.chan_monitor.added_monitors.lock().unwrap();
+                                       if $fail_backwards {
+                                               assert_eq!(added_monitors.len(), 2);
+                                               assert!(added_monitors[0].0 != added_monitors[1].0);
+                                       } else {
+                                               assert_eq!(added_monitors.len(), 1);
+                                       }
+                                       added_monitors.clear();
+                               }
+                       }
+               }
+       }
+
        fn send_along_route(origin_node: &Node, route: Route, expected_route: &[&Node], recv_value: u64) -> ([u8; 32], [u8; 32]) {
                let our_payment_preimage = [*origin_node.network_payment_count.borrow(); 32];
                *origin_node.network_payment_count.borrow_mut() += 1;
@@ -2648,26 +2767,7 @@ mod tests {
                                assert_eq!(added_monitors.len(), 0);
                        }
 
-                       let revoke_and_ack = node.node.handle_commitment_signed(&prev_node.node.get_our_node_id(), &payment_event.commitment_msg).unwrap();
-                       {
-                               let mut added_monitors = node.chan_monitor.added_monitors.lock().unwrap();
-                               assert_eq!(added_monitors.len(), 1);
-                               added_monitors.clear();
-                       }
-                       assert!(prev_node.node.handle_revoke_and_ack(&node.node.get_our_node_id(), &revoke_and_ack.0).unwrap().is_none());
-                       let prev_revoke_and_ack = prev_node.node.handle_commitment_signed(&node.node.get_our_node_id(), &revoke_and_ack.1.unwrap()).unwrap();
-                       {
-                               let mut added_monitors = prev_node.chan_monitor.added_monitors.lock().unwrap();
-                               assert_eq!(added_monitors.len(), 2);
-                               added_monitors.clear();
-                       }
-                       assert!(node.node.handle_revoke_and_ack(&prev_node.node.get_our_node_id(), &prev_revoke_and_ack.0).unwrap().is_none());
-                       assert!(prev_revoke_and_ack.1.is_none());
-                       {
-                               let mut added_monitors = node.chan_monitor.added_monitors.lock().unwrap();
-                               assert_eq!(added_monitors.len(), 1);
-                               added_monitors.clear();
-                       }
+                       commitment_signed_dance!(node, prev_node, payment_event.commitment_msg, false);
 
                        let events_1 = node.node.get_and_clear_pending_events();
                        assert_eq!(events_1.len(), 1);
@@ -2705,7 +2805,7 @@ mod tests {
                (our_payment_preimage, our_payment_hash)
        }
 
-       fn claim_payment(origin_node: &Node, expected_route: &[&Node], our_payment_preimage: [u8; 32]) {
+       fn claim_payment_along_route(origin_node: &Node, expected_route: &[&Node], skip_last: bool, our_payment_preimage: [u8; 32]) {
                assert!(expected_route.last().unwrap().node.claim_funds(our_payment_preimage));
                {
                        let mut added_monitors = expected_route.last().unwrap().chan_monitor.added_monitors.lock().unwrap();
@@ -2727,66 +2827,58 @@ mod tests {
                                                }
                                                added_monitors.clear();
                                        }
-                                       let revoke_and_commit = $node.node.handle_commitment_signed(&$prev_node.node.get_our_node_id(), &next_msgs.as_ref().unwrap().1).unwrap();
-                                       {
-                                               let mut added_monitors = $node.chan_monitor.added_monitors.lock().unwrap();
-                                               assert_eq!(added_monitors.len(), 1);
-                                               added_monitors.clear();
-                                       }
-                                       assert!($prev_node.node.handle_revoke_and_ack(&$node.node.get_our_node_id(), &revoke_and_commit.0).unwrap().is_none());
-                                       let revoke_and_ack = $prev_node.node.handle_commitment_signed(&$node.node.get_our_node_id(), &revoke_and_commit.1.unwrap()).unwrap();
-                                       assert!(revoke_and_ack.1.is_none());
-                                       {
-                                               let mut added_monitors = $prev_node.chan_monitor.added_monitors.lock().unwrap();
-                                               assert_eq!(added_monitors.len(), 2);
-                                               added_monitors.clear();
-                                       }
-                                       assert!($node.node.handle_revoke_and_ack(&$prev_node.node.get_our_node_id(), &revoke_and_ack.0).unwrap().is_none());
-                                       {
-                                               let mut added_monitors = $node.chan_monitor.added_monitors.lock().unwrap();
-                                               assert_eq!(added_monitors.len(), 1);
-                                               added_monitors.clear();
-                                       }
+                                       commitment_signed_dance!($node, $prev_node, next_msgs.as_ref().unwrap().1, false);
                                }
                        }
                }
 
                let mut expected_next_node = expected_route.last().unwrap().node.get_our_node_id();
                let mut prev_node = expected_route.last().unwrap();
-               for node in expected_route.iter().rev() {
+               for (idx, node) in expected_route.iter().rev().enumerate() {
                        assert_eq!(expected_next_node, node.node.get_our_node_id());
                        if next_msgs.is_some() {
                                update_fulfill_dance!(node, prev_node, false);
                        }
 
                        let events = node.node.get_and_clear_pending_events();
+                       if !skip_last || idx != expected_route.len() - 1 {
+                               assert_eq!(events.len(), 1);
+                               match events[0] {
+                                       Event::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref commitment_signed } } => {
+                                               assert!(update_add_htlcs.is_empty());
+                                               assert_eq!(update_fulfill_htlcs.len(), 1);
+                                               assert!(update_fail_htlcs.is_empty());
+                                               assert!(update_fail_malformed_htlcs.is_empty());
+                                               expected_next_node = node_id.clone();
+                                               next_msgs = Some((update_fulfill_htlcs[0].clone(), commitment_signed.clone()));
+                                       },
+                                       _ => panic!("Unexpected event"),
+                               }
+                       } else {
+                               assert!(events.is_empty());
+                       }
+                       if !skip_last && idx == expected_route.len() - 1 {
+                               assert_eq!(expected_next_node, origin_node.node.get_our_node_id());
+                       }
+
+                       prev_node = node;
+               }
+
+               if !skip_last {
+                       update_fulfill_dance!(origin_node, expected_route.first().unwrap(), true);
+                       let events = origin_node.node.get_and_clear_pending_events();
                        assert_eq!(events.len(), 1);
                        match events[0] {
-                               Event::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref commitment_signed } } => {
-                                       assert!(update_add_htlcs.is_empty());
-                                       assert_eq!(update_fulfill_htlcs.len(), 1);
-                                       assert!(update_fail_htlcs.is_empty());
-                                       assert!(update_fail_malformed_htlcs.is_empty());
-                                       expected_next_node = node_id.clone();
-                                       next_msgs = Some((update_fulfill_htlcs[0].clone(), commitment_signed.clone()));
+                               Event::PaymentSent { payment_preimage } => {
+                                       assert_eq!(payment_preimage, our_payment_preimage);
                                },
                                _ => panic!("Unexpected event"),
-                       };
-
-                       prev_node = node;
+                       }
                }
+       }
 
-               assert_eq!(expected_next_node, origin_node.node.get_our_node_id());
-               update_fulfill_dance!(origin_node, expected_route.first().unwrap(), true);
-
-               let events = origin_node.node.get_and_clear_pending_events();
-               assert_eq!(events.len(), 1);
-               match events[0] {
-                       Event::PaymentSent { payment_preimage } => {
-                               assert_eq!(payment_preimage, our_payment_preimage);
-                       },
-                       _ => panic!("Unexpected event"),
-               }
+       fn claim_payment(origin_node: &Node, expected_route: &[&Node], our_payment_preimage: [u8; 32]) {
+               claim_payment_along_route(origin_node, expected_route, false, our_payment_preimage);
        }
 
        const TEST_FINAL_CLTV: u32 = 32;
@@ -2830,7 +2922,7 @@ mod tests {
                claim_payment(&origin, expected_route, our_payment_preimage);
        }
 
-       fn fail_payment(origin_node: &Node, expected_route: &[&Node], our_payment_hash: [u8; 32]) {
+       fn fail_payment_along_route(origin_node: &Node, expected_route: &[&Node], skip_last: bool, our_payment_hash: [u8; 32]) {
                assert!(expected_route.last().unwrap().node.fail_htlc_backwards(&our_payment_hash));
                {
                        let mut added_monitors = expected_route.last().unwrap().chan_monitor.added_monitors.lock().unwrap();
@@ -2843,80 +2935,64 @@ mod tests {
                        ($node: expr, $prev_node: expr, $last_node: expr) => {
                                {
                                        $node.node.handle_update_fail_htlc(&$prev_node.node.get_our_node_id(), &next_msgs.as_ref().unwrap().0).unwrap();
-                                       let revoke_and_commit = $node.node.handle_commitment_signed(&$prev_node.node.get_our_node_id(), &next_msgs.as_ref().unwrap().1).unwrap();
-
-                                       {
-                                               let mut added_monitors = $node.chan_monitor.added_monitors.lock().unwrap();
-                                               assert_eq!(added_monitors.len(), 1);
-                                               added_monitors.clear();
-                                       }
-                                       assert!($prev_node.node.handle_revoke_and_ack(&$node.node.get_our_node_id(), &revoke_and_commit.0).unwrap().is_none());
-                                       {
-                                               let mut added_monitors = $prev_node.chan_monitor.added_monitors.lock().unwrap();
-                                               assert_eq!(added_monitors.len(), 1);
-                                               added_monitors.clear();
-                                       }
-                                       let revoke_and_ack = $prev_node.node.handle_commitment_signed(&$node.node.get_our_node_id(), &revoke_and_commit.1.unwrap()).unwrap();
-                                       {
-                                               let mut added_monitors = $prev_node.chan_monitor.added_monitors.lock().unwrap();
-                                               assert_eq!(added_monitors.len(), 1);
-                                               added_monitors.clear();
-                                       }
-                                       assert!(revoke_and_ack.1.is_none());
-                                       assert!($node.node.get_and_clear_pending_events().is_empty());
-                                       assert!($node.node.handle_revoke_and_ack(&$prev_node.node.get_our_node_id(), &revoke_and_ack.0).unwrap().is_none());
-                                       {
-                                               let mut added_monitors = $node.chan_monitor.added_monitors.lock().unwrap();
-                                               if $last_node {
-                                                       assert_eq!(added_monitors.len(), 1);
-                                               } else {
-                                                       assert_eq!(added_monitors.len(), 2);
-                                                       assert!(added_monitors[0].0 != added_monitors[1].0);
-                                               }
-                                               added_monitors.clear();
-                                       }
+                                       commitment_signed_dance!($node, $prev_node, next_msgs.as_ref().unwrap().1, !$last_node);
                                }
                        }
                }
 
                let mut expected_next_node = expected_route.last().unwrap().node.get_our_node_id();
                let mut prev_node = expected_route.last().unwrap();
-               for node in expected_route.iter().rev() {
+               for (idx, node) in expected_route.iter().rev().enumerate() {
                        assert_eq!(expected_next_node, node.node.get_our_node_id());
                        if next_msgs.is_some() {
-                               update_fail_dance!(node, prev_node, false);
+                               // We may be the "last node" for the purpose of the commitment dance if we're
+                               // skipping the last node (implying it is disconnected) and we're the
+                               // second-to-last node!
+                               update_fail_dance!(node, prev_node, skip_last && idx == expected_route.len() - 1);
                        }
 
                        let events = node.node.get_and_clear_pending_events();
-                       assert_eq!(events.len(), 1);
-                       match events[0] {
-                               Event::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref commitment_signed } } => {
-                                       assert!(update_add_htlcs.is_empty());
-                                       assert!(update_fulfill_htlcs.is_empty());
-                                       assert_eq!(update_fail_htlcs.len(), 1);
-                                       assert!(update_fail_malformed_htlcs.is_empty());
-                                       expected_next_node = node_id.clone();
-                                       next_msgs = Some((update_fail_htlcs[0].clone(), commitment_signed.clone()));
-                               },
-                               _ => panic!("Unexpected event"),
-                       };
+                       if !skip_last || idx != expected_route.len() - 1 {
+                               assert_eq!(events.len(), 1);
+                               match events[0] {
+                                       Event::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref commitment_signed } } => {
+                                               assert!(update_add_htlcs.is_empty());
+                                               assert!(update_fulfill_htlcs.is_empty());
+                                               assert_eq!(update_fail_htlcs.len(), 1);
+                                               assert!(update_fail_malformed_htlcs.is_empty());
+                                               expected_next_node = node_id.clone();
+                                               next_msgs = Some((update_fail_htlcs[0].clone(), commitment_signed.clone()));
+                                       },
+                                       _ => panic!("Unexpected event"),
+                               }
+                       } else {
+                               assert!(events.is_empty());
+                       }
+                       if !skip_last && idx == expected_route.len() - 1 {
+                               assert_eq!(expected_next_node, origin_node.node.get_our_node_id());
+                       }
 
                        prev_node = node;
                }
 
-               assert_eq!(expected_next_node, origin_node.node.get_our_node_id());
-               update_fail_dance!(origin_node, expected_route.first().unwrap(), true);
+               if !skip_last {
+                       update_fail_dance!(origin_node, expected_route.first().unwrap(), true);
 
-               let events = origin_node.node.get_and_clear_pending_events();
-               assert_eq!(events.len(), 1);
-               match events[0] {
-                       Event::PaymentFailed { payment_hash } => {
-                               assert_eq!(payment_hash, our_payment_hash);
-                       },
-                       _ => panic!("Unexpected event"),
+                       let events = origin_node.node.get_and_clear_pending_events();
+                       assert_eq!(events.len(), 1);
+                       match events[0] {
+                               Event::PaymentFailed { payment_hash } => {
+                                       assert_eq!(payment_hash, our_payment_hash);
+                               },
+                               _ => panic!("Unexpected event"),
+                       }
                }
        }
 
+       fn fail_payment(origin_node: &Node, expected_route: &[&Node], our_payment_hash: [u8; 32]) {
+               fail_payment_along_route(origin_node, expected_route, false, our_payment_hash);
+       }
+
        fn create_network(node_count: usize) -> Vec<Node> {
                let mut nodes = Vec::new();
                let mut rng = thread_rng();
@@ -3057,12 +3133,6 @@ mod tests {
                close_channel(&nodes[2], &nodes[3], &chan_3.2, chan_3.3, true);
                close_channel(&nodes[1], &nodes[3], &chan_4.2, chan_4.3, false);
                close_channel(&nodes[1], &nodes[3], &chan_5.2, chan_5.3, false);
-
-               // Check that we processed all pending events
-               for node in nodes {
-                       assert_eq!(node.node.get_and_clear_pending_events().len(), 0);
-                       assert_eq!(node.chan_monitor.added_monitors.lock().unwrap().len(), 0);
-               }
        }
 
        #[test]
@@ -3372,12 +3442,167 @@ mod tests {
                get_announce_close_broadcast_events(&nodes, 0, 1);
                assert_eq!(nodes[0].node.list_channels().len(), 0);
                assert_eq!(nodes[1].node.list_channels().len(), 0);
+       }
 
-               // Check that we processed all pending events
-               for node in nodes {
-                       assert_eq!(node.node.get_and_clear_pending_events().len(), 0);
-                       assert_eq!(node.chan_monitor.added_monitors.lock().unwrap().len(), 0);
+       #[test]
+       fn test_htlc_ignore_latest_remote_commitment() {
+               // Test that HTLC transactions spending the latest remote commitment transaction are simply
+               // ignored if we cannot claim them. This originally tickled an invalid unwrap().
+               let nodes = create_network(2);
+               create_announced_chan_between_nodes(&nodes, 0, 1);
+
+               route_payment(&nodes[0], &[&nodes[1]], 10000000);
+               nodes[0].node.force_close_channel(&nodes[0].node.list_channels()[0].channel_id);
+               {
+                       let events = nodes[0].node.get_and_clear_pending_events();
+                       assert_eq!(events.len(), 1);
+                       match events[0] {
+                               Event::BroadcastChannelUpdate { msg: msgs::ChannelUpdate { contents: msgs::UnsignedChannelUpdate { flags, .. }, .. } } => {
+                                       assert_eq!(flags & 0b10, 0b10);
+                               },
+                               _ => panic!("Unexpected event"),
+                       }
+               }
+
+               let node_txn = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap();
+               assert_eq!(node_txn.len(), 2);
+
+               let header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
+               nodes[1].chain_monitor.block_connected_checked(&header, 1, &[&node_txn[0], &node_txn[1]], &[1; 2]);
+
+               {
+                       let events = nodes[1].node.get_and_clear_pending_events();
+                       assert_eq!(events.len(), 1);
+                       match events[0] {
+                               Event::BroadcastChannelUpdate { msg: msgs::ChannelUpdate { contents: msgs::UnsignedChannelUpdate { flags, .. }, .. } } => {
+                                       assert_eq!(flags & 0b10, 0b10);
+                               },
+                               _ => panic!("Unexpected event"),
+                       }
+               }
+
+               // Duplicate the block_connected call since this may happen due to other listeners
+               // registering new transactions
+               nodes[1].chain_monitor.block_connected_checked(&header, 1, &[&node_txn[0], &node_txn[1]], &[1; 2]);
+       }
+
+       #[test]
+       fn test_force_close_fail_back() {
+               // Check which HTLCs are failed-backwards on channel force-closure
+               let mut nodes = create_network(3);
+               create_announced_chan_between_nodes(&nodes, 0, 1);
+               create_announced_chan_between_nodes(&nodes, 1, 2);
+
+               let route = nodes[0].router.get_route(&nodes[2].node.get_our_node_id(), None, &Vec::new(), 1000000, 42).unwrap();
+
+               let our_payment_preimage = [*nodes[0].network_payment_count.borrow(); 32];
+               *nodes[0].network_payment_count.borrow_mut() += 1;
+               let our_payment_hash = {
+                       let mut sha = Sha256::new();
+                       sha.input(&our_payment_preimage[..]);
+                       let mut ret = [0; 32];
+                       sha.result(&mut ret);
+                       ret
+               };
+
+               let mut payment_event = {
+                       nodes[0].node.send_payment(route, our_payment_hash).unwrap();
+                       {
+                               let mut added_monitors = nodes[0].chan_monitor.added_monitors.lock().unwrap();
+                               assert_eq!(added_monitors.len(), 1);
+                               added_monitors.clear();
+                       }
+
+                       let mut events = nodes[0].node.get_and_clear_pending_events();
+                       assert_eq!(events.len(), 1);
+                       SendEvent::from_event(events.remove(0))
+               };
+
+               nodes[1].node.handle_update_add_htlc(&nodes[0].node.get_our_node_id(), &payment_event.msgs[0]).unwrap();
+               commitment_signed_dance!(nodes[1], nodes[0], payment_event.commitment_msg, false);
+
+               let events_1 = nodes[1].node.get_and_clear_pending_events();
+               assert_eq!(events_1.len(), 1);
+               match events_1[0] {
+                       Event::PendingHTLCsForwardable { .. } => { },
+                       _ => panic!("Unexpected event"),
+               };
+
+               nodes[1].node.channel_state.lock().unwrap().next_forward = Instant::now();
+               nodes[1].node.process_pending_htlc_forwards();
+
+               let mut events_2 = nodes[1].node.get_and_clear_pending_events();
+               assert_eq!(events_2.len(), 1);
+               payment_event = SendEvent::from_event(events_2.remove(0));
+               assert_eq!(payment_event.msgs.len(), 1);
+
+               {
+                       let mut added_monitors = nodes[1].chan_monitor.added_monitors.lock().unwrap();
+                       assert_eq!(added_monitors.len(), 1);
+                       added_monitors.clear();
+               }
+
+               nodes[2].node.handle_update_add_htlc(&nodes[1].node.get_our_node_id(), &payment_event.msgs[0]).unwrap();
+               nodes[2].node.handle_commitment_signed(&nodes[1].node.get_our_node_id(), &payment_event.commitment_msg).unwrap();
+
+               {
+                       let mut added_monitors = nodes[2].chan_monitor.added_monitors.lock().unwrap();
+                       assert_eq!(added_monitors.len(), 1);
+                       added_monitors.clear();
+               }
+
+               // nodes[2] now has the latest commitment transaction, but hasn't revoked its previous
+               // state or updated nodes[1]' state. Now force-close and broadcast that commitment/HTLC
+               // transaction and ensure nodes[1] doesn't fail-backwards (this was originally a bug!).
+
+               nodes[2].node.force_close_channel(&payment_event.commitment_msg.channel_id);
+               let events_3 = nodes[2].node.get_and_clear_pending_events();
+               assert_eq!(events_3.len(), 1);
+               match events_3[0] {
+                       Event::BroadcastChannelUpdate { msg: msgs::ChannelUpdate { contents: msgs::UnsignedChannelUpdate { flags, .. }, .. } } => {
+                               assert_eq!(flags & 0b10, 0b10);
+                       },
+                       _ => panic!("Unexpected event"),
+               }
+
+               let tx = {
+                       let mut node_txn = nodes[2].tx_broadcaster.txn_broadcasted.lock().unwrap();
+                       // Note that we don't bother broadcasting the HTLC-Success transaction here as we don't
+                       // have a use for it unless nodes[2] learns the preimage somehow, the funds will go
+                       // back to nodes[1] upon timeout otherwise.
+                       assert_eq!(node_txn.len(), 1);
+                       node_txn.remove(0)
+               };
+
+               let header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
+               nodes[1].chain_monitor.block_connected_checked(&header, 1, &[&tx], &[1]);
+
+               let events_4 = nodes[1].node.get_and_clear_pending_events();
+               // Note no UpdateHTLCs event here from nodes[1] to nodes[0]!
+               assert_eq!(events_4.len(), 1);
+               match events_4[0] {
+                       Event::BroadcastChannelUpdate { msg: msgs::ChannelUpdate { contents: msgs::UnsignedChannelUpdate { flags, .. }, .. } } => {
+                               assert_eq!(flags & 0b10, 0b10);
+                       },
+                       _ => panic!("Unexpected event"),
+               }
+
+               // Now check that if we add the preimage to ChannelMonitor it broadcasts our HTLC-Success..
+               {
+                       let mut monitors = nodes[2].chan_monitor.simple_monitor.monitors.lock().unwrap();
+                       monitors.get_mut(&OutPoint::new(Sha256dHash::from(&payment_event.commitment_msg.channel_id[..]), 0)).unwrap()
+                               .provide_payment_preimage(&our_payment_hash, &our_payment_preimage);
                }
+               nodes[2].chain_monitor.block_connected_checked(&header, 1, &[&tx], &[1]);
+               let node_txn = nodes[2].tx_broadcaster.txn_broadcasted.lock().unwrap();
+               assert_eq!(node_txn.len(), 1);
+               assert_eq!(node_txn[0].input.len(), 1);
+               assert_eq!(node_txn[0].input[0].previous_output.txid, tx.txid());
+               assert_eq!(node_txn[0].lock_time, 0); // Must be an HTLC-Success
+               assert_eq!(node_txn[0].input[0].witness.len(), 5); // Must be an HTLC-Success
+               let mut funding_tx_map = HashMap::new();
+               funding_tx_map.insert(tx.txid(), tx);
+               node_txn[0].verify(&funding_tx_map).unwrap();
        }
 
        #[test]
@@ -3401,11 +3626,164 @@ mod tests {
                while !headers.is_empty() {
                        nodes[0].node.block_disconnected(&headers.pop().unwrap());
                }
+               {
+                       let events = nodes[0].node.get_and_clear_pending_events();
+                       assert_eq!(events.len(), 1);
+                       match events[0] {
+                               Event::BroadcastChannelUpdate { msg: msgs::ChannelUpdate { contents: msgs::UnsignedChannelUpdate { flags, .. }, .. } } => {
+                                       assert_eq!(flags & 0b10, 0b10);
+                               },
+                               _ => panic!("Unexpected event"),
+                       }
+               }
                let channel_state = nodes[0].node.channel_state.lock().unwrap();
                assert_eq!(channel_state.by_id.len(), 0);
                assert_eq!(channel_state.short_to_id.len(), 0);
        }
 
+       fn reconnect_nodes(node_a: &Node, node_b: &Node, pre_all_htlcs: bool, pending_htlc_claims: (usize, usize), pending_htlc_fails: (usize, usize)) {
+               let reestablish_1 = node_a.node.peer_connected(&node_b.node.get_our_node_id());
+               let reestablish_2 = node_b.node.peer_connected(&node_a.node.get_our_node_id());
+
+               let mut resp_1 = Vec::new();
+               for msg in reestablish_1 {
+                       resp_1.push(node_b.node.handle_channel_reestablish(&node_a.node.get_our_node_id(), &msg).unwrap());
+               }
+               {
+                       let mut added_monitors = node_b.chan_monitor.added_monitors.lock().unwrap();
+                       if pending_htlc_claims.0 != 0 || pending_htlc_fails.0 != 0 {
+                               assert_eq!(added_monitors.len(), 1);
+                       } else {
+                               assert!(added_monitors.is_empty());
+                       }
+                       added_monitors.clear();
+               }
+
+               let mut resp_2 = Vec::new();
+               for msg in reestablish_2 {
+                       resp_2.push(node_a.node.handle_channel_reestablish(&node_b.node.get_our_node_id(), &msg).unwrap());
+               }
+               {
+                       let mut added_monitors = node_a.chan_monitor.added_monitors.lock().unwrap();
+                       if pending_htlc_claims.1 != 0 || pending_htlc_fails.1 != 0 {
+                               assert_eq!(added_monitors.len(), 1);
+                       } else {
+                               assert!(added_monitors.is_empty());
+                       }
+                       added_monitors.clear();
+               }
+
+               // We dont yet support both needing updates, as that would require a different commitment dance:
+               assert!((pending_htlc_claims.0 == 0 && pending_htlc_fails.0 == 0) || (pending_htlc_claims.1 == 0 && pending_htlc_fails.1 == 0));
+
+               for chan_msgs in resp_1.drain(..) {
+                       if pre_all_htlcs {
+                               let _announcement_sigs_opt = node_a.node.handle_funding_locked(&node_b.node.get_our_node_id(), &chan_msgs.0.unwrap()).unwrap();
+                               //TODO: Test announcement_sigs re-sending when we've implemented it
+                       } else {
+                               assert!(chan_msgs.0.is_none());
+                       }
+                       assert!(chan_msgs.1.is_none());
+                       if pending_htlc_claims.0 != 0 || pending_htlc_fails.0 != 0 {
+                               let commitment_update = chan_msgs.2.unwrap();
+                               assert!(commitment_update.update_add_htlcs.is_empty()); // We can't relay while disconnected
+                               assert_eq!(commitment_update.update_fulfill_htlcs.len(), pending_htlc_claims.0);
+                               assert_eq!(commitment_update.update_fail_htlcs.len(), pending_htlc_fails.0);
+                               assert!(commitment_update.update_fail_malformed_htlcs.is_empty());
+                               for update_fulfill in commitment_update.update_fulfill_htlcs {
+                                       node_a.node.handle_update_fulfill_htlc(&node_b.node.get_our_node_id(), &update_fulfill).unwrap();
+                               }
+                               for update_fail in commitment_update.update_fail_htlcs {
+                                       node_a.node.handle_update_fail_htlc(&node_b.node.get_our_node_id(), &update_fail).unwrap();
+                               }
+
+                               commitment_signed_dance!(node_a, node_b, commitment_update.commitment_signed, false);
+                       } else {
+                               assert!(chan_msgs.2.is_none());
+                       }
+               }
+
+               for chan_msgs in resp_2.drain(..) {
+                       if pre_all_htlcs {
+                               let _announcement_sigs_opt = node_b.node.handle_funding_locked(&node_a.node.get_our_node_id(), &chan_msgs.0.unwrap()).unwrap();
+                               //TODO: Test announcement_sigs re-sending when we've implemented it
+                       } else {
+                               assert!(chan_msgs.0.is_none());
+                       }
+                       assert!(chan_msgs.1.is_none());
+                       if pending_htlc_claims.1 != 0 || pending_htlc_fails.1 != 0 {
+                               let commitment_update = chan_msgs.2.unwrap();
+                               assert!(commitment_update.update_add_htlcs.is_empty()); // We can't relay while disconnected
+                               assert_eq!(commitment_update.update_fulfill_htlcs.len(), pending_htlc_claims.0);
+                               assert_eq!(commitment_update.update_fail_htlcs.len(), pending_htlc_fails.0);
+                               assert!(commitment_update.update_fail_malformed_htlcs.is_empty());
+                               for update_fulfill in commitment_update.update_fulfill_htlcs {
+                                       node_b.node.handle_update_fulfill_htlc(&node_a.node.get_our_node_id(), &update_fulfill).unwrap();
+                               }
+                               for update_fail in commitment_update.update_fail_htlcs {
+                                       node_b.node.handle_update_fail_htlc(&node_a.node.get_our_node_id(), &update_fail).unwrap();
+                               }
+
+                               commitment_signed_dance!(node_b, node_a, commitment_update.commitment_signed, false);
+                       } else {
+                               assert!(chan_msgs.2.is_none());
+                       }
+               }
+       }
+
+       #[test]
+       fn test_simple_peer_disconnect() {
+               // Test that we can reconnect when there are no lost messages
+               let nodes = create_network(3);
+               create_announced_chan_between_nodes(&nodes, 0, 1);
+               create_announced_chan_between_nodes(&nodes, 1, 2);
+
+               nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id(), false);
+               nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false);
+               reconnect_nodes(&nodes[0], &nodes[1], true, (0, 0), (0, 0));
+
+               let payment_preimage_1 = route_payment(&nodes[0], &vec!(&nodes[1], &nodes[2])[..], 1000000).0;
+               let payment_hash_2 = route_payment(&nodes[0], &vec!(&nodes[1], &nodes[2])[..], 1000000).1;
+               fail_payment(&nodes[0], &vec!(&nodes[1], &nodes[2]), payment_hash_2);
+               claim_payment(&nodes[0], &vec!(&nodes[1], &nodes[2]), payment_preimage_1);
+
+               nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id(), false);
+               nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false);
+               reconnect_nodes(&nodes[0], &nodes[1], false, (0, 0), (0, 0));
+
+               let payment_preimage_3 = route_payment(&nodes[0], &vec!(&nodes[1], &nodes[2])[..], 1000000).0;
+               let payment_preimage_4 = route_payment(&nodes[0], &vec!(&nodes[1], &nodes[2])[..], 1000000).0;
+               let payment_hash_5 = route_payment(&nodes[0], &vec!(&nodes[1], &nodes[2])[..], 1000000).1;
+               let payment_hash_6 = route_payment(&nodes[0], &vec!(&nodes[1], &nodes[2])[..], 1000000).1;
+
+               nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id(), false);
+               nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false);
+
+               claim_payment_along_route(&nodes[0], &vec!(&nodes[1], &nodes[2]), true, payment_preimage_3);
+               fail_payment_along_route(&nodes[0], &[&nodes[1], &nodes[2]], true, payment_hash_5);
+
+               reconnect_nodes(&nodes[0], &nodes[1], false, (1, 0), (1, 0));
+               {
+                       let events = nodes[0].node.get_and_clear_pending_events();
+                       assert_eq!(events.len(), 2);
+                       match events[0] {
+                               Event::PaymentSent { payment_preimage } => {
+                                       assert_eq!(payment_preimage, payment_preimage_3);
+                               },
+                               _ => panic!("Unexpected event"),
+                       }
+                       match events[1] {
+                               Event::PaymentFailed { payment_hash } => {
+                                       assert_eq!(payment_hash, payment_hash_5);
+                               },
+                               _ => panic!("Unexpected event"),
+                       }
+               }
+
+               claim_payment(&nodes[0], &vec!(&nodes[1], &nodes[2]), payment_preimage_4);
+               fail_payment(&nodes[0], &vec!(&nodes[1], &nodes[2]), payment_hash_6);
+       }
+
        #[test]
        fn test_invalid_channel_announcement() {
                //Test BOLT 7 channel_announcement msg requirement for final node, gather data to build customed channel_announcement msgs