Merge pull request #178 from TheBlueMatt/2018-09-channel_reestablish
[rust-lightning] / src / ln / channelmanager.rs
index af73cc4226c7664388b440a294e7496602170991..e7c6965e94e1c69575c75c82da3b04ae67e9314d 100644 (file)
@@ -1902,7 +1902,27 @@ impl ChannelManager {
                Ok(())
        }
 
-
+       fn internal_channel_reestablish(&self, their_node_id: &PublicKey, msg: &msgs::ChannelReestablish) -> Result<(Option<msgs::FundingLocked>, Option<msgs::RevokeAndACK>, Option<msgs::CommitmentUpdate>), MsgHandleErrInternal> {
+               let (res, chan_monitor) = {
+                       let mut channel_state = self.channel_state.lock().unwrap();
+                       match channel_state.by_id.get_mut(&msg.channel_id) {
+                               Some(chan) => {
+                                       if chan.get_their_node_id() != *their_node_id {
+                                               return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!", msg.channel_id));
+                                       }
+                                       let (funding_locked, revoke_and_ack, commitment_update, channel_monitor) = chan.channel_reestablish(msg).map_err(|e| MsgHandleErrInternal::from_maybe_close(e))?;
+                                       (Ok((funding_locked, revoke_and_ack, commitment_update)), channel_monitor)
+                               },
+                               None => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel", msg.channel_id))
+                       }
+               };
+               if let Some(monitor) = chan_monitor {
+                       if let Err(_e) = self.monitor.add_update_monitor(monitor.get_funding_txo().unwrap(), monitor) {
+                               unimplemented!();
+                       }
+               }
+               res
+       }
 }
 
 impl events::EventsProvider for ChannelManager {
@@ -2123,9 +2143,14 @@ impl ChannelMessageHandler for ChannelManager {
                handle_error!(self, self.internal_announcement_signatures(their_node_id, msg), their_node_id)
        }
 
+       fn handle_channel_reestablish(&self, their_node_id: &PublicKey, msg: &msgs::ChannelReestablish) -> Result<(Option<msgs::FundingLocked>, Option<msgs::RevokeAndACK>, Option<msgs::CommitmentUpdate>), HandleError> {
+               handle_error!(self, self.internal_channel_reestablish(their_node_id, msg), their_node_id)
+       }
+
        fn peer_disconnected(&self, their_node_id: &PublicKey, no_connection_possible: bool) {
                let mut new_events = Vec::new();
                let mut failed_channels = Vec::new();
+               let mut failed_payments = Vec::new();
                {
                        let mut channel_state_lock = self.channel_state.lock().unwrap();
                        let channel_state = channel_state_lock.borrow_parts();
@@ -2148,13 +2173,23 @@ impl ChannelMessageHandler for ChannelManager {
                                        }
                                });
                        } else {
-                               for chan in channel_state.by_id {
-                                       if chan.1.get_their_node_id() == *their_node_id {
-                                               //TODO: mark channel disabled (and maybe announce such after a timeout). Also
-                                               //fail and wipe any uncommitted outbound HTLCs as those are considered after
-                                               //reconnect.
+                               channel_state.by_id.retain(|_, chan| {
+                                       if chan.get_their_node_id() == *their_node_id {
+                                               //TODO: mark channel disabled (and maybe announce such after a timeout).
+                                               let failed_adds = chan.remove_uncommitted_htlcs_and_mark_paused();
+                                               if !failed_adds.is_empty() {
+                                                       let chan_update = self.get_channel_update(&chan).map(|u| u.encode_with_len()).unwrap(); // Cannot add/recv HTLCs before we have a short_id so unwrap is safe
+                                                       failed_payments.push((chan_update, failed_adds));
+                                               }
+                                               if chan.is_shutdown() {
+                                                       if let Some(short_id) = chan.get_short_channel_id() {
+                                                               short_to_id.remove(&short_id);
+                                                       }
+                                                       return false;
+                                               }
                                        }
-                               }
+                                       true
+                               })
                        }
                }
                for failure in failed_channels.drain(..) {
@@ -2166,6 +2201,32 @@ impl ChannelMessageHandler for ChannelManager {
                                pending_events.push(event);
                        }
                }
+               for (chan_update, mut htlc_sources) in failed_payments {
+                       for (htlc_source, payment_hash) in htlc_sources.drain(..) {
+                               self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_source, &payment_hash, HTLCFailReason::Reason { failure_code: 0x1000 | 7, data: chan_update.clone() });
+                       }
+               }
+       }
+
+       fn peer_connected(&self, their_node_id: &PublicKey) -> Vec<msgs::ChannelReestablish> {
+               let mut res = Vec::new();
+               let mut channel_state = self.channel_state.lock().unwrap();
+               channel_state.by_id.retain(|_, chan| {
+                       if chan.get_their_node_id() == *their_node_id {
+                               if !chan.have_received_message() {
+                                       // If we created this (outbound) channel while we were disconnected from the
+                                       // peer we probably failed to send the open_channel message, which is now
+                                       // lost. We can't have had anything pending related to this channel, so we just
+                                       // drop it.
+                                       false
+                               } else {
+                                       res.push(chan.get_channel_reestablish());
+                                       true
+                               }
+                       } else { true }
+               });
+               //TODO: Also re-broadcast announcement_signatures
+               res
        }
 
        fn handle_error(&self, their_node_id: &PublicKey, msg: &msgs::ErrorMessage) {
@@ -3577,6 +3638,149 @@ mod tests {
                assert_eq!(channel_state.short_to_id.len(), 0);
        }
 
+       fn reconnect_nodes(node_a: &Node, node_b: &Node, pre_all_htlcs: bool, pending_htlc_claims: (usize, usize), pending_htlc_fails: (usize, usize)) {
+               let reestablish_1 = node_a.node.peer_connected(&node_b.node.get_our_node_id());
+               let reestablish_2 = node_b.node.peer_connected(&node_a.node.get_our_node_id());
+
+               let mut resp_1 = Vec::new();
+               for msg in reestablish_1 {
+                       resp_1.push(node_b.node.handle_channel_reestablish(&node_a.node.get_our_node_id(), &msg).unwrap());
+               }
+               {
+                       let mut added_monitors = node_b.chan_monitor.added_monitors.lock().unwrap();
+                       if pending_htlc_claims.0 != 0 || pending_htlc_fails.0 != 0 {
+                               assert_eq!(added_monitors.len(), 1);
+                       } else {
+                               assert!(added_monitors.is_empty());
+                       }
+                       added_monitors.clear();
+               }
+
+               let mut resp_2 = Vec::new();
+               for msg in reestablish_2 {
+                       resp_2.push(node_a.node.handle_channel_reestablish(&node_b.node.get_our_node_id(), &msg).unwrap());
+               }
+               {
+                       let mut added_monitors = node_a.chan_monitor.added_monitors.lock().unwrap();
+                       if pending_htlc_claims.1 != 0 || pending_htlc_fails.1 != 0 {
+                               assert_eq!(added_monitors.len(), 1);
+                       } else {
+                               assert!(added_monitors.is_empty());
+                       }
+                       added_monitors.clear();
+               }
+
+               // We dont yet support both needing updates, as that would require a different commitment dance:
+               assert!((pending_htlc_claims.0 == 0 && pending_htlc_fails.0 == 0) || (pending_htlc_claims.1 == 0 && pending_htlc_fails.1 == 0));
+
+               for chan_msgs in resp_1.drain(..) {
+                       if pre_all_htlcs {
+                               let _announcement_sigs_opt = node_a.node.handle_funding_locked(&node_b.node.get_our_node_id(), &chan_msgs.0.unwrap()).unwrap();
+                               //TODO: Test announcement_sigs re-sending when we've implemented it
+                       } else {
+                               assert!(chan_msgs.0.is_none());
+                       }
+                       assert!(chan_msgs.1.is_none());
+                       if pending_htlc_claims.0 != 0 || pending_htlc_fails.0 != 0 {
+                               let commitment_update = chan_msgs.2.unwrap();
+                               assert!(commitment_update.update_add_htlcs.is_empty()); // We can't relay while disconnected
+                               assert_eq!(commitment_update.update_fulfill_htlcs.len(), pending_htlc_claims.0);
+                               assert_eq!(commitment_update.update_fail_htlcs.len(), pending_htlc_fails.0);
+                               assert!(commitment_update.update_fail_malformed_htlcs.is_empty());
+                               for update_fulfill in commitment_update.update_fulfill_htlcs {
+                                       node_a.node.handle_update_fulfill_htlc(&node_b.node.get_our_node_id(), &update_fulfill).unwrap();
+                               }
+                               for update_fail in commitment_update.update_fail_htlcs {
+                                       node_a.node.handle_update_fail_htlc(&node_b.node.get_our_node_id(), &update_fail).unwrap();
+                               }
+
+                               commitment_signed_dance!(node_a, node_b, commitment_update.commitment_signed, false);
+                       } else {
+                               assert!(chan_msgs.2.is_none());
+                       }
+               }
+
+               for chan_msgs in resp_2.drain(..) {
+                       if pre_all_htlcs {
+                               let _announcement_sigs_opt = node_b.node.handle_funding_locked(&node_a.node.get_our_node_id(), &chan_msgs.0.unwrap()).unwrap();
+                               //TODO: Test announcement_sigs re-sending when we've implemented it
+                       } else {
+                               assert!(chan_msgs.0.is_none());
+                       }
+                       assert!(chan_msgs.1.is_none());
+                       if pending_htlc_claims.1 != 0 || pending_htlc_fails.1 != 0 {
+                               let commitment_update = chan_msgs.2.unwrap();
+                               assert!(commitment_update.update_add_htlcs.is_empty()); // We can't relay while disconnected
+                               assert_eq!(commitment_update.update_fulfill_htlcs.len(), pending_htlc_claims.0);
+                               assert_eq!(commitment_update.update_fail_htlcs.len(), pending_htlc_fails.0);
+                               assert!(commitment_update.update_fail_malformed_htlcs.is_empty());
+                               for update_fulfill in commitment_update.update_fulfill_htlcs {
+                                       node_b.node.handle_update_fulfill_htlc(&node_a.node.get_our_node_id(), &update_fulfill).unwrap();
+                               }
+                               for update_fail in commitment_update.update_fail_htlcs {
+                                       node_b.node.handle_update_fail_htlc(&node_a.node.get_our_node_id(), &update_fail).unwrap();
+                               }
+
+                               commitment_signed_dance!(node_b, node_a, commitment_update.commitment_signed, false);
+                       } else {
+                               assert!(chan_msgs.2.is_none());
+                       }
+               }
+       }
+
+       #[test]
+       fn test_simple_peer_disconnect() {
+               // Test that we can reconnect when there are no lost messages
+               let nodes = create_network(3);
+               create_announced_chan_between_nodes(&nodes, 0, 1);
+               create_announced_chan_between_nodes(&nodes, 1, 2);
+
+               nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id(), false);
+               nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false);
+               reconnect_nodes(&nodes[0], &nodes[1], true, (0, 0), (0, 0));
+
+               let payment_preimage_1 = route_payment(&nodes[0], &vec!(&nodes[1], &nodes[2])[..], 1000000).0;
+               let payment_hash_2 = route_payment(&nodes[0], &vec!(&nodes[1], &nodes[2])[..], 1000000).1;
+               fail_payment(&nodes[0], &vec!(&nodes[1], &nodes[2]), payment_hash_2);
+               claim_payment(&nodes[0], &vec!(&nodes[1], &nodes[2]), payment_preimage_1);
+
+               nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id(), false);
+               nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false);
+               reconnect_nodes(&nodes[0], &nodes[1], false, (0, 0), (0, 0));
+
+               let payment_preimage_3 = route_payment(&nodes[0], &vec!(&nodes[1], &nodes[2])[..], 1000000).0;
+               let payment_preimage_4 = route_payment(&nodes[0], &vec!(&nodes[1], &nodes[2])[..], 1000000).0;
+               let payment_hash_5 = route_payment(&nodes[0], &vec!(&nodes[1], &nodes[2])[..], 1000000).1;
+               let payment_hash_6 = route_payment(&nodes[0], &vec!(&nodes[1], &nodes[2])[..], 1000000).1;
+
+               nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id(), false);
+               nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false);
+
+               claim_payment_along_route(&nodes[0], &vec!(&nodes[1], &nodes[2]), true, payment_preimage_3);
+               fail_payment_along_route(&nodes[0], &[&nodes[1], &nodes[2]], true, payment_hash_5);
+
+               reconnect_nodes(&nodes[0], &nodes[1], false, (1, 0), (1, 0));
+               {
+                       let events = nodes[0].node.get_and_clear_pending_events();
+                       assert_eq!(events.len(), 2);
+                       match events[0] {
+                               Event::PaymentSent { payment_preimage } => {
+                                       assert_eq!(payment_preimage, payment_preimage_3);
+                               },
+                               _ => panic!("Unexpected event"),
+                       }
+                       match events[1] {
+                               Event::PaymentFailed { payment_hash } => {
+                                       assert_eq!(payment_hash, payment_hash_5);
+                               },
+                               _ => panic!("Unexpected event"),
+                       }
+               }
+
+               claim_payment(&nodes[0], &vec!(&nodes[1], &nodes[2]), payment_preimage_4);
+               fail_payment(&nodes[0], &vec!(&nodes[1], &nodes[2]), payment_hash_6);
+       }
+
        #[test]
        fn test_invalid_channel_announcement() {
                //Test BOLT 7 channel_announcement msg requirement for final node, gather data to build customed channel_announcement msgs