Send shutdown/closing_signed msgs out-of-band for ordered delivery
authorMatt Corallo <git@bluematt.me>
Sat, 20 Oct 2018 01:50:16 +0000 (21:50 -0400)
committerMatt Corallo <git@bluematt.me>
Sat, 27 Oct 2018 13:42:04 +0000 (09:42 -0400)
src/ln/channelmanager.rs
src/ln/msgs.rs
src/ln/peer_handler.rs
src/util/events.rs
src/util/test_utils.rs

index 4777a3e810a220349d0f66bbaa921e77e489d9d4..ace557528b3dc875898ee1bc4c8820bab1e6af0c 100644 (file)
@@ -1758,8 +1758,8 @@ impl ChannelManager {
                }
        }
 
-       fn internal_shutdown(&self, their_node_id: &PublicKey, msg: &msgs::Shutdown) -> Result<(Option<msgs::Shutdown>, Option<msgs::ClosingSigned>), MsgHandleErrInternal> {
-               let (mut res, chan_option) = {
+       fn internal_shutdown(&self, their_node_id: &PublicKey, msg: &msgs::Shutdown) -> Result<(), MsgHandleErrInternal> {
+               let (mut dropped_htlcs, chan_option) = {
                        let mut channel_state_lock = self.channel_state.lock().unwrap();
                        let channel_state = channel_state_lock.borrow_parts();
 
@@ -1769,18 +1769,30 @@ impl ChannelManager {
                                                //TODO: here and below MsgHandleErrInternal, #153 case
                                                return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!", msg.channel_id));
                                        }
-                                       let res = chan_entry.get_mut().shutdown(&*self.fee_estimator, &msg).map_err(|e| MsgHandleErrInternal::from_maybe_close(e))?;
+                                       let (shutdown, closing_signed, dropped_htlcs) = chan_entry.get_mut().shutdown(&*self.fee_estimator, &msg).map_err(|e| MsgHandleErrInternal::from_maybe_close(e))?;
+                                       if let Some(msg) = shutdown {
+                                               channel_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown {
+                                                       node_id: their_node_id.clone(),
+                                                       msg,
+                                               });
+                                       }
+                                       if let Some(msg) = closing_signed {
+                                               channel_state.pending_msg_events.push(events::MessageSendEvent::SendClosingSigned {
+                                                       node_id: their_node_id.clone(),
+                                                       msg,
+                                               });
+                                       }
                                        if chan_entry.get().is_shutdown() {
                                                if let Some(short_id) = chan_entry.get().get_short_channel_id() {
                                                        channel_state.short_to_id.remove(&short_id);
                                                }
-                                               (res, Some(chan_entry.remove_entry().1))
-                                       } else { (res, None) }
+                                               (dropped_htlcs, Some(chan_entry.remove_entry().1))
+                                       } else { (dropped_htlcs, None) }
                                },
                                hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel", msg.channel_id))
                        }
                };
-               for htlc_source in res.2.drain(..) {
+               for htlc_source in dropped_htlcs.drain(..) {
                        // unknown_next_peer...I dunno who that is anymore....
                        self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_source.0, &htlc_source.1, HTLCFailReason::Reason { failure_code: 0x4000 | 10, data: Vec::new() });
                }
@@ -1792,11 +1804,11 @@ impl ChannelManager {
                                });
                        }
                }
-               Ok((res.0, res.1))
+               Ok(())
        }
 
-       fn internal_closing_signed(&self, their_node_id: &PublicKey, msg: &msgs::ClosingSigned) -> Result<Option<msgs::ClosingSigned>, MsgHandleErrInternal> {
-               let (res, chan_option) = {
+       fn internal_closing_signed(&self, their_node_id: &PublicKey, msg: &msgs::ClosingSigned) -> Result<(), MsgHandleErrInternal> {
+               let (tx, chan_option) = {
                        let mut channel_state_lock = self.channel_state.lock().unwrap();
                        let channel_state = channel_state_lock.borrow_parts();
                        match channel_state.by_id.entry(msg.channel_id.clone()) {
@@ -1805,8 +1817,14 @@ impl ChannelManager {
                                                //TODO: here and below MsgHandleErrInternal, #153 case
                                                return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!", msg.channel_id));
                                        }
-                                       let res = chan_entry.get_mut().closing_signed(&*self.fee_estimator, &msg).map_err(|e| MsgHandleErrInternal::from_maybe_close(e))?;
-                                       if res.1.is_some() {
+                                       let (closing_signed, tx) = chan_entry.get_mut().closing_signed(&*self.fee_estimator, &msg).map_err(|e| MsgHandleErrInternal::from_maybe_close(e))?;
+                                       if let Some(msg) = closing_signed {
+                                               channel_state.pending_msg_events.push(events::MessageSendEvent::SendClosingSigned {
+                                                       node_id: their_node_id.clone(),
+                                                       msg,
+                                               });
+                                       }
+                                       if tx.is_some() {
                                                // We're done with this channel, we've got a signed closing transaction and
                                                // will send the closing_signed back to the remote peer upon return. This
                                                // also implies there are no pending HTLCs left on the channel, so we can
@@ -1815,13 +1833,13 @@ impl ChannelManager {
                                                if let Some(short_id) = chan_entry.get().get_short_channel_id() {
                                                        channel_state.short_to_id.remove(&short_id);
                                                }
-                                               (res, Some(chan_entry.remove_entry().1))
-                                       } else { (res, None) }
+                                               (tx, Some(chan_entry.remove_entry().1))
+                                       } else { (tx, None) }
                                },
                                hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel", msg.channel_id))
                        }
                };
-               if let Some(broadcast_tx) = res.1 {
+               if let Some(broadcast_tx) = tx {
                        self.tx_broadcaster.broadcast_transaction(&broadcast_tx);
                }
                if let Some(chan) = chan_option {
@@ -1832,7 +1850,7 @@ impl ChannelManager {
                                });
                        }
                }
-               Ok(res.0)
+               Ok(())
        }
 
        fn internal_update_add_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateAddHTLC) -> Result<(), MsgHandleErrInternal> {
@@ -2495,11 +2513,11 @@ impl ChannelMessageHandler for ChannelManager {
                handle_error!(self, self.internal_funding_locked(their_node_id, msg), their_node_id)
        }
 
-       fn handle_shutdown(&self, their_node_id: &PublicKey, msg: &msgs::Shutdown) -> Result<(Option<msgs::Shutdown>, Option<msgs::ClosingSigned>), HandleError> {
+       fn handle_shutdown(&self, their_node_id: &PublicKey, msg: &msgs::Shutdown) -> Result<(), HandleError> {
                handle_error!(self, self.internal_shutdown(their_node_id, msg), their_node_id)
        }
 
-       fn handle_closing_signed(&self, their_node_id: &PublicKey, msg: &msgs::ClosingSigned) -> Result<Option<msgs::ClosingSigned>, HandleError> {
+       fn handle_closing_signed(&self, their_node_id: &PublicKey, msg: &msgs::ClosingSigned) -> Result<(), HandleError> {
                handle_error!(self, self.internal_closing_signed(their_node_id, msg), their_node_id)
        }
 
@@ -3025,68 +3043,91 @@ mod tests {
        }
 
        fn close_channel(outbound_node: &Node, inbound_node: &Node, channel_id: &[u8; 32], funding_tx: Transaction, close_inbound_first: bool) -> (msgs::ChannelUpdate, msgs::ChannelUpdate) {
-               let (node_a, broadcaster_a) = if close_inbound_first { (&inbound_node.node, &inbound_node.tx_broadcaster) } else { (&outbound_node.node, &outbound_node.tx_broadcaster) };
+               let (node_a, broadcaster_a, struct_a) = if close_inbound_first { (&inbound_node.node, &inbound_node.tx_broadcaster, inbound_node) } else { (&outbound_node.node, &outbound_node.tx_broadcaster, outbound_node) };
                let (node_b, broadcaster_b) = if close_inbound_first { (&outbound_node.node, &outbound_node.tx_broadcaster) } else { (&inbound_node.node, &inbound_node.tx_broadcaster) };
                let (tx_a, tx_b);
 
                node_a.close_channel(channel_id).unwrap();
-               let events_1 = node_a.get_and_clear_pending_msg_events();
-               assert_eq!(events_1.len(), 1);
-               let shutdown_a = match events_1[0] {
+               node_b.handle_shutdown(&node_a.get_our_node_id(), &get_event_msg!(struct_a, MessageSendEvent::SendShutdown, node_b.get_our_node_id())).unwrap();
+
+               let events_1 = node_b.get_and_clear_pending_msg_events();
+               assert!(events_1.len() >= 1);
+               let shutdown_b = match events_1[0] {
                        MessageSendEvent::SendShutdown { ref node_id, ref msg } => {
-                               assert_eq!(node_id, &node_b.get_our_node_id());
+                               assert_eq!(node_id, &node_a.get_our_node_id());
                                msg.clone()
                        },
                        _ => panic!("Unexpected event"),
                };
 
-               let (shutdown_b, mut closing_signed_b) = node_b.handle_shutdown(&node_a.get_our_node_id(), &shutdown_a).unwrap();
-               if !close_inbound_first {
-                       assert!(closing_signed_b.is_none());
+               let closing_signed_b = if !close_inbound_first {
+                       assert_eq!(events_1.len(), 1);
+                       None
+               } else {
+                       Some(match events_1[1] {
+                               MessageSendEvent::SendClosingSigned { ref node_id, ref msg } => {
+                                       assert_eq!(node_id, &node_a.get_our_node_id());
+                                       msg.clone()
+                               },
+                               _ => panic!("Unexpected event"),
+                       })
+               };
+
+               macro_rules! get_closing_signed_broadcast {
+                       ($node: expr, $dest_pubkey: expr) => {
+                               {
+                                       let events = $node.get_and_clear_pending_msg_events();
+                                       assert!(events.len() == 1 || events.len() == 2);
+                                       (match events[events.len() - 1] {
+                                               MessageSendEvent::BroadcastChannelUpdate { ref msg } => {
+                                                       msg.clone()
+                                               },
+                                               _ => panic!("Unexpected event"),
+                                       }, if events.len() == 2 {
+                                               match events[0] {
+                                                       MessageSendEvent::SendClosingSigned { ref node_id, ref msg } => {
+                                                               assert_eq!(*node_id, $dest_pubkey);
+                                                               Some(msg.clone())
+                                                       },
+                                                       _ => panic!("Unexpected event"),
+                                               }
+                                       } else { None })
+                               }
+                       }
                }
-               let (empty_a, mut closing_signed_a) = node_a.handle_shutdown(&node_b.get_our_node_id(), &shutdown_b.unwrap()).unwrap();
-               assert!(empty_a.is_none());
-               if close_inbound_first {
-                       assert!(closing_signed_a.is_none());
-                       closing_signed_a = node_a.handle_closing_signed(&node_b.get_our_node_id(), &closing_signed_b.unwrap()).unwrap();
+
+               node_a.handle_shutdown(&node_b.get_our_node_id(), &shutdown_b).unwrap();
+               let (as_update, bs_update) = if close_inbound_first {
+                       assert!(node_a.get_and_clear_pending_msg_events().is_empty());
+                       node_a.handle_closing_signed(&node_b.get_our_node_id(), &closing_signed_b.unwrap()).unwrap();
                        assert_eq!(broadcaster_a.txn_broadcasted.lock().unwrap().len(), 1);
                        tx_a = broadcaster_a.txn_broadcasted.lock().unwrap().remove(0);
+                       let (as_update, closing_signed_a) = get_closing_signed_broadcast!(node_a, node_b.get_our_node_id());
 
-                       let empty_b = node_b.handle_closing_signed(&node_a.get_our_node_id(), &closing_signed_a.unwrap()).unwrap();
-                       assert!(empty_b.is_none());
+                       node_b.handle_closing_signed(&node_a.get_our_node_id(), &closing_signed_a.unwrap()).unwrap();
+                       let (bs_update, none_b) = get_closing_signed_broadcast!(node_b, node_a.get_our_node_id());
+                       assert!(none_b.is_none());
                        assert_eq!(broadcaster_b.txn_broadcasted.lock().unwrap().len(), 1);
                        tx_b = broadcaster_b.txn_broadcasted.lock().unwrap().remove(0);
+                       (as_update, bs_update)
                } else {
-                       closing_signed_b = node_b.handle_closing_signed(&node_a.get_our_node_id(), &closing_signed_a.unwrap()).unwrap();
+                       let closing_signed_a = get_event_msg!(struct_a, MessageSendEvent::SendClosingSigned, node_b.get_our_node_id());
+
+                       node_b.handle_closing_signed(&node_a.get_our_node_id(), &closing_signed_a).unwrap();
                        assert_eq!(broadcaster_b.txn_broadcasted.lock().unwrap().len(), 1);
                        tx_b = broadcaster_b.txn_broadcasted.lock().unwrap().remove(0);
+                       let (bs_update, closing_signed_b) = get_closing_signed_broadcast!(node_b, node_a.get_our_node_id());
 
-                       let empty_a2 = node_a.handle_closing_signed(&node_b.get_our_node_id(), &closing_signed_b.unwrap()).unwrap();
-                       assert!(empty_a2.is_none());
+                       node_a.handle_closing_signed(&node_b.get_our_node_id(), &closing_signed_b.unwrap()).unwrap();
+                       let (as_update, none_a) = get_closing_signed_broadcast!(node_a, node_b.get_our_node_id());
+                       assert!(none_a.is_none());
                        assert_eq!(broadcaster_a.txn_broadcasted.lock().unwrap().len(), 1);
                        tx_a = broadcaster_a.txn_broadcasted.lock().unwrap().remove(0);
-               }
+                       (as_update, bs_update)
+               };
                assert_eq!(tx_a, tx_b);
                check_spends!(tx_a, funding_tx);
 
-               let events_2 = node_a.get_and_clear_pending_msg_events();
-               assert_eq!(events_2.len(), 1);
-               let as_update = match events_2[0] {
-                       MessageSendEvent::BroadcastChannelUpdate { ref msg } => {
-                               msg.clone()
-                       },
-                       _ => panic!("Unexpected event"),
-               };
-
-               let events_3 = node_b.get_and_clear_pending_msg_events();
-               assert_eq!(events_3.len(), 1);
-               let bs_update = match events_3[0] {
-                       MessageSendEvent::BroadcastChannelUpdate { ref msg } => {
-                               msg.clone()
-                       },
-                       _ => panic!("Unexpected event"),
-               };
-
                (as_update, bs_update)
        }
 
index d01595b72164bfe09917144f2bf47c54a0c1809b..b8fdeb1299491dbba9f74e9d005e94168220dfbc 100644 (file)
@@ -235,12 +235,14 @@ pub struct FundingLocked {
 }
 
 /// A shutdown message to be sent or received from a peer
+#[derive(Clone)]
 pub struct Shutdown {
        pub(crate) channel_id: [u8; 32],
        pub(crate) scriptpubkey: Script,
 }
 
 /// A closing_signed message to be sent or received from a peer
+#[derive(Clone)]
 pub struct ClosingSigned {
        pub(crate) channel_id: [u8; 32],
        pub(crate) fee_satoshis: u64,
@@ -540,9 +542,9 @@ pub trait ChannelMessageHandler : events::MessageSendEventsProvider + Send + Syn
 
        // Channl close:
        /// Handle an incoming shutdown message from the given peer.
-       fn handle_shutdown(&self, their_node_id: &PublicKey, msg: &Shutdown) -> Result<(Option<Shutdown>, Option<ClosingSigned>), HandleError>;
+       fn handle_shutdown(&self, their_node_id: &PublicKey, msg: &Shutdown) -> Result<(), HandleError>;
        /// Handle an incoming closing_signed message from the given peer.
-       fn handle_closing_signed(&self, their_node_id: &PublicKey, msg: &ClosingSigned) -> Result<Option<ClosingSigned>, HandleError>;
+       fn handle_closing_signed(&self, their_node_id: &PublicKey, msg: &ClosingSigned) -> Result<(), HandleError>;
 
        // HTLC handling:
        /// Handle an incoming update_add_htlc message from the given peer.
index 9a2cee9dd4931ec01cd579250edb917ff2c4ee8d..5f1a8dc4034174830d851e4a4867db36f7241586 100644 (file)
@@ -581,20 +581,11 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
 
                                                                                        38 => {
                                                                                                let msg = try_potential_decodeerror!(msgs::Shutdown::read(&mut reader));
-                                                                                               let resp_options = try_potential_handleerror!(self.message_handler.chan_handler.handle_shutdown(&peer.their_node_id.unwrap(), &msg));
-                                                                                               if let Some(resp) = resp_options.0 {
-                                                                                                       encode_and_send_msg!(resp, 38);
-                                                                                               }
-                                                                                               if let Some(resp) = resp_options.1 {
-                                                                                                       encode_and_send_msg!(resp, 39);
-                                                                                               }
+                                                                                               try_potential_handleerror!(self.message_handler.chan_handler.handle_shutdown(&peer.their_node_id.unwrap(), &msg));
                                                                                        },
                                                                                        39 => {
                                                                                                let msg = try_potential_decodeerror!(msgs::ClosingSigned::read(&mut reader));
-                                                                                               let resp_option = try_potential_handleerror!(self.message_handler.chan_handler.handle_closing_signed(&peer.their_node_id.unwrap(), &msg));
-                                                                                               if let Some(resp) = resp_option {
-                                                                                                       encode_and_send_msg!(resp, 39);
-                                                                                               }
+                                                                                               try_potential_handleerror!(self.message_handler.chan_handler.handle_closing_signed(&peer.their_node_id.unwrap(), &msg));
                                                                                        },
 
                                                                                        128 => {
@@ -883,6 +874,16 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 133)));
                                                Self::do_attempt_write_data(&mut descriptor, peer);
                                        },
+                                       MessageSendEvent::SendClosingSigned { ref node_id, ref msg } => {
+                                               log_trace!(self, "Handling SendClosingSigned event in peer_handler for node {} for channel {}",
+                                                               log_pubkey!(node_id),
+                                                               log_bytes!(msg.channel_id));
+                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
+                                                               //TODO: Do whatever we're gonna do for handling dropped messages
+                                                       });
+                                               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 39)));
+                                               Self::do_attempt_write_data(&mut descriptor, peer);
+                                       },
                                        MessageSendEvent::SendShutdown { ref node_id, ref msg } => {
                                                log_trace!(self, "Handling Shutdown event in peer_handler for node {} for channel {}",
                                                                log_pubkey!(node_id),
index d7312cd9129b1cadd92c82bd09f2e5233f558d53..cfe16151a22266cfc5db7b74d8069f92f6b5983e 100644 (file)
@@ -161,6 +161,13 @@ pub enum MessageSendEvent {
                /// The message which should be sent.
                msg: msgs::RevokeAndACK,
        },
+       /// Used to indicate that a closing_signed message should be sent to the peer with the given node_id.
+       SendClosingSigned {
+               /// The node_id of the node which should receive this message
+               node_id: PublicKey,
+               /// The message which should be sent.
+               msg: msgs::ClosingSigned,
+       },
        /// Used to indicate that a shutdown message should be sent to the peer with the given node_id.
        SendShutdown {
                /// The node_id of the node which should receive this message
index f8959dcf17b9b2b2b46b5db14c44df8a27390a82..123ddd06facaeb06240deb3ff244acc3b2833e60 100644 (file)
@@ -101,10 +101,10 @@ impl msgs::ChannelMessageHandler for TestChannelMessageHandler {
        fn handle_funding_locked(&self, _their_node_id: &PublicKey, _msg: &msgs::FundingLocked) -> Result<(), HandleError> {
                Err(HandleError { err: "", action: None })
        }
-       fn handle_shutdown(&self, _their_node_id: &PublicKey, _msg: &msgs::Shutdown) -> Result<(Option<msgs::Shutdown>, Option<msgs::ClosingSigned>), HandleError> {
+       fn handle_shutdown(&self, _their_node_id: &PublicKey, _msg: &msgs::Shutdown) -> Result<(), HandleError> {
                Err(HandleError { err: "", action: None })
        }
-       fn handle_closing_signed(&self, _their_node_id: &PublicKey, _msg: &msgs::ClosingSigned) -> Result<Option<msgs::ClosingSigned>, HandleError> {
+       fn handle_closing_signed(&self, _their_node_id: &PublicKey, _msg: &msgs::ClosingSigned) -> Result<(), HandleError> {
                Err(HandleError { err: "", action: None })
        }
        fn handle_update_add_htlc(&self, _their_node_id: &PublicKey, _msg: &msgs::UpdateAddHTLC) -> Result<(), HandleError> {