Send channel_reestablish out-of-band to ensure ordered deliver
authorMatt Corallo <git@bluematt.me>
Sat, 20 Oct 2018 21:50:34 +0000 (17:50 -0400)
committerMatt Corallo <git@bluematt.me>
Sat, 27 Oct 2018 13:42:04 +0000 (09:42 -0400)
src/ln/channelmanager.rs
src/ln/msgs.rs
src/ln/peer_handler.rs
src/util/events.rs
src/util/test_utils.rs

index 7d11fe987f2c67d07a8a3fbc2cbe43bb3df1a7e0..c5356e291789215d98ebbdae6cfa676dcf6d01f7 100644 (file)
@@ -2676,9 +2676,10 @@ impl ChannelMessageHandler for ChannelManager {
                }
        }
 
-       fn peer_connected(&self, their_node_id: &PublicKey) -> Vec<msgs::ChannelReestablish> {
-               let mut res = Vec::new();
-               let mut channel_state = self.channel_state.lock().unwrap();
+       fn peer_connected(&self, their_node_id: &PublicKey) {
+               let mut channel_state_lock = self.channel_state.lock().unwrap();
+               let channel_state = channel_state_lock.borrow_parts();
+               let pending_msg_events = channel_state.pending_msg_events;
                channel_state.by_id.retain(|_, chan| {
                        if chan.get_their_node_id() == *their_node_id {
                                if !chan.have_received_message() {
@@ -2688,13 +2689,15 @@ impl ChannelMessageHandler for ChannelManager {
                                        // drop it.
                                        false
                                } else {
-                                       res.push(chan.get_channel_reestablish());
+                                       pending_msg_events.push(events::MessageSendEvent::SendChannelReestablish {
+                                               node_id: chan.get_their_node_id(),
+                                               msg: chan.get_channel_reestablish(),
+                                       });
                                        true
                                }
                        } else { true }
                });
                //TODO: Also re-broadcast announcement_signatures
-               res
        }
 
        fn handle_error(&self, their_node_id: &PublicKey, msg: &msgs::ErrorMessage) {
@@ -5197,6 +5200,23 @@ mod tests {
                assert_eq!(channel_state.short_to_id.len(), 0);
        }
 
+       macro_rules! get_chan_reestablish_msgs {
+               ($src_node: expr, $dst_node: expr) => {
+                       {
+                               let mut res = Vec::with_capacity(1);
+                               for msg in $src_node.node.get_and_clear_pending_msg_events() {
+                                       if let MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } = msg {
+                                               assert_eq!(*node_id, $dst_node.node.get_our_node_id());
+                                               res.push(msg.clone());
+                                       } else {
+                                               panic!("Unexpected event")
+                                       }
+                               }
+                               res
+                       }
+               }
+       }
+
        macro_rules! handle_chan_reestablish_msgs {
                ($src_node: expr, $dst_node: expr) => {
                        {
@@ -5255,8 +5275,10 @@ mod tests {
        /// pending_htlc_adds includes both the holding cell and in-flight update_add_htlcs, whereas
        /// for claims/fails they are separated out.
        fn reconnect_nodes(node_a: &Node, node_b: &Node, pre_all_htlcs: bool, pending_htlc_adds: (i64, i64), pending_htlc_claims: (usize, usize), pending_cell_htlc_claims: (usize, usize), pending_cell_htlc_fails: (usize, usize), pending_raa: (bool, bool)) {
-               let reestablish_1 = node_a.node.peer_connected(&node_b.node.get_our_node_id());
-               let reestablish_2 = node_b.node.peer_connected(&node_a.node.get_our_node_id());
+               node_a.node.peer_connected(&node_b.node.get_our_node_id());
+               let reestablish_1 = get_chan_reestablish_msgs!(node_a, node_b);
+               node_b.node.peer_connected(&node_a.node.get_our_node_id());
+               let reestablish_2 = get_chan_reestablish_msgs!(node_b, node_a);
 
                let mut resp_1 = Vec::new();
                for msg in reestablish_1 {
@@ -5754,9 +5776,11 @@ mod tests {
                nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id(), false);
                nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false);
 
-               let reestablish_1 = nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id());
+               nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id());
+               let reestablish_1 = get_chan_reestablish_msgs!(nodes[0], nodes[1]);
                assert_eq!(reestablish_1.len(), 1);
-               let reestablish_2 = nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id());
+               nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id());
+               let reestablish_2 = get_chan_reestablish_msgs!(nodes[1], nodes[0]);
                assert_eq!(reestablish_2.len(), 1);
 
                nodes[0].node.handle_channel_reestablish(&nodes[1].node.get_our_node_id(), &reestablish_2[0]).unwrap();
@@ -6042,9 +6066,11 @@ mod tests {
                        nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id(), false);
                        nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false);
 
-                       let reestablish_1 = nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id());
+                       nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id());
+                       let reestablish_1 = get_chan_reestablish_msgs!(nodes[0], nodes[1]);
                        assert_eq!(reestablish_1.len(), 1);
-                       let reestablish_2 = nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id());
+                       nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id());
+                       let reestablish_2 = get_chan_reestablish_msgs!(nodes[1], nodes[0]);
                        assert_eq!(reestablish_2.len(), 1);
 
                        nodes[0].node.handle_channel_reestablish(&nodes[1].node.get_our_node_id(), &reestablish_2[0]).unwrap();
@@ -6062,9 +6088,11 @@ mod tests {
                        assert!(nodes[0].node.get_and_clear_pending_events().is_empty());
                        assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty());
 
-                       let reestablish_1 = nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id());
+                       nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id());
+                       let reestablish_1 = get_chan_reestablish_msgs!(nodes[0], nodes[1]);
                        assert_eq!(reestablish_1.len(), 1);
-                       let reestablish_2 = nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id());
+                       nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id());
+                       let reestablish_2 = get_chan_reestablish_msgs!(nodes[1], nodes[0]);
                        assert_eq!(reestablish_2.len(), 1);
 
                        nodes[0].node.handle_channel_reestablish(&nodes[1].node.get_our_node_id(), &reestablish_2[0]).unwrap();
index 6335dc0686717a5aa38085880b71f642fa6b7221..110e2f336adb7e112fe5104c67a35db8aedf22e7 100644 (file)
@@ -308,14 +308,14 @@ pub struct UpdateFee {
        pub(crate) feerate_per_kw: u32,
 }
 
-#[derive(PartialEq)]
+#[derive(PartialEq, Clone)]
 pub(crate) struct DataLossProtect {
        pub(crate) your_last_per_commitment_secret: [u8; 32],
        pub(crate) my_current_per_commitment_point: PublicKey,
 }
 
 /// A channel_reestablish message to be sent or received from a peer
-#[derive(PartialEq)]
+#[derive(PartialEq, Clone)]
 pub struct ChannelReestablish {
        pub(crate) channel_id: [u8; 32],
        pub(crate) next_local_commitment_number: u64,
@@ -563,7 +563,7 @@ pub trait ChannelMessageHandler : events::MessageSendEventsProvider + Send + Syn
        fn peer_disconnected(&self, their_node_id: &PublicKey, no_connection_possible: bool);
 
        /// Handle a peer reconnecting, possibly generating channel_reestablish message(s).
-       fn peer_connected(&self, their_node_id: &PublicKey) -> Vec<ChannelReestablish>;
+       fn peer_connected(&self, their_node_id: &PublicKey);
        /// Handle an incoming channel_reestablish message from the given peer.
        fn handle_channel_reestablish(&self, their_node_id: &PublicKey, msg: &ChannelReestablish) -> Result<(), HandleError>;
 
index 5c7e23c6beaf68e229d0071e64df0f61303e7f38..4471ea0025807d4ad63c71338785ca6b6897cffe 100644 (file)
@@ -520,9 +520,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                                                                        }, 16);
                                                                                                }
 
-                                                                                               for msg in self.message_handler.chan_handler.peer_connected(&peer.their_node_id.unwrap()) {
-                                                                                                       encode_and_send_msg!(msg, 136);
-                                                                                               }
+                                                                                               self.message_handler.chan_handler.peer_connected(&peer.their_node_id.unwrap());
                                                                                        },
                                                                                        17 => {
                                                                                                let msg = try_potential_decodeerror!(msgs::ErrorMessage::read(&mut reader));
@@ -834,6 +832,16 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 38)));
                                                Self::do_attempt_write_data(&mut descriptor, peer);
                                        },
+                                       MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => {
+                                               log_trace!(self, "Handling SendChannelReestablish event in peer_handler for node {} for channel {}",
+                                                               log_pubkey!(node_id),
+                                                               log_bytes!(msg.channel_id));
+                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
+                                                               //TODO: Do whatever we're gonna do for handling dropped messages
+                                                       });
+                                               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 136)));
+                                               Self::do_attempt_write_data(&mut descriptor, peer);
+                                       },
                                        MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => {
                                                log_trace!(self, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}", msg.contents.short_channel_id);
                                                if self.message_handler.route_handler.handle_channel_announcement(msg).is_ok() && self.message_handler.route_handler.handle_channel_update(update_msg).is_ok() {
index cfe16151a22266cfc5db7b74d8069f92f6b5983e..37cee2eded48778d581f1b808aafbe6ab5989c16 100644 (file)
@@ -175,6 +175,13 @@ pub enum MessageSendEvent {
                /// The message which should be sent.
                msg: msgs::Shutdown,
        },
+       /// Used to indicate that a channel_reestablish message should be sent to the peer with the given node_id.
+       SendChannelReestablish {
+               /// The node_id of the node which should receive this message
+               node_id: PublicKey,
+               /// The message which should be sent.
+               msg: msgs::ChannelReestablish,
+       },
        /// Used to indicate that a channel_announcement and channel_update should be broadcast to all
        /// peers (except the peer with node_id either msg.contents.node_id_1 or msg.contents.node_id_2).
        BroadcastChannelAnnouncement {
index d20908a38085a7daded1132d392d491ece0933a4..4fa29fd8e95c94b3a52850dc5867aa4f0f1caed0 100644 (file)
@@ -135,9 +135,7 @@ impl msgs::ChannelMessageHandler for TestChannelMessageHandler {
                Err(HandleError { err: "", action: None })
        }
        fn peer_disconnected(&self, _their_node_id: &PublicKey, _no_connection_possible: bool) {}
-       fn peer_connected(&self, _their_node_id: &PublicKey) -> Vec<msgs::ChannelReestablish> {
-               Vec::new()
-       }
+       fn peer_connected(&self, _their_node_id: &PublicKey) {}
        fn handle_error(&self, _their_node_id: &PublicKey, _msg: &msgs::ErrorMessage) {}
 }