From: Matt Corallo Date: Sat, 20 Oct 2018 21:50:34 +0000 (-0400) Subject: Send channel_reestablish out-of-band to ensure ordered deliver X-Git-Tag: v0.0.12~285^2~1 X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=commitdiff_plain;h=249aa7755038172279781caf47f681c01843dc73;p=rust-lightning Send channel_reestablish out-of-band to ensure ordered deliver --- diff --git a/src/ln/channelmanager.rs b/src/ln/channelmanager.rs index 7d11fe987..c5356e291 100644 --- a/src/ln/channelmanager.rs +++ b/src/ln/channelmanager.rs @@ -2676,9 +2676,10 @@ impl ChannelMessageHandler for ChannelManager { } } - fn peer_connected(&self, their_node_id: &PublicKey) -> Vec { - let mut res = Vec::new(); - let mut channel_state = self.channel_state.lock().unwrap(); + fn peer_connected(&self, their_node_id: &PublicKey) { + let mut channel_state_lock = self.channel_state.lock().unwrap(); + let channel_state = channel_state_lock.borrow_parts(); + let pending_msg_events = channel_state.pending_msg_events; channel_state.by_id.retain(|_, chan| { if chan.get_their_node_id() == *their_node_id { if !chan.have_received_message() { @@ -2688,13 +2689,15 @@ impl ChannelMessageHandler for ChannelManager { // drop it. false } else { - res.push(chan.get_channel_reestablish()); + pending_msg_events.push(events::MessageSendEvent::SendChannelReestablish { + node_id: chan.get_their_node_id(), + msg: chan.get_channel_reestablish(), + }); true } } else { true } }); //TODO: Also re-broadcast announcement_signatures - res } fn handle_error(&self, their_node_id: &PublicKey, msg: &msgs::ErrorMessage) { @@ -5197,6 +5200,23 @@ mod tests { assert_eq!(channel_state.short_to_id.len(), 0); } + macro_rules! get_chan_reestablish_msgs { + ($src_node: expr, $dst_node: expr) => { + { + let mut res = Vec::with_capacity(1); + for msg in $src_node.node.get_and_clear_pending_msg_events() { + if let MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } = msg { + assert_eq!(*node_id, $dst_node.node.get_our_node_id()); + res.push(msg.clone()); + } else { + panic!("Unexpected event") + } + } + res + } + } + } + macro_rules! handle_chan_reestablish_msgs { ($src_node: expr, $dst_node: expr) => { { @@ -5255,8 +5275,10 @@ mod tests { /// pending_htlc_adds includes both the holding cell and in-flight update_add_htlcs, whereas /// for claims/fails they are separated out. fn reconnect_nodes(node_a: &Node, node_b: &Node, pre_all_htlcs: bool, pending_htlc_adds: (i64, i64), pending_htlc_claims: (usize, usize), pending_cell_htlc_claims: (usize, usize), pending_cell_htlc_fails: (usize, usize), pending_raa: (bool, bool)) { - let reestablish_1 = node_a.node.peer_connected(&node_b.node.get_our_node_id()); - let reestablish_2 = node_b.node.peer_connected(&node_a.node.get_our_node_id()); + node_a.node.peer_connected(&node_b.node.get_our_node_id()); + let reestablish_1 = get_chan_reestablish_msgs!(node_a, node_b); + node_b.node.peer_connected(&node_a.node.get_our_node_id()); + let reestablish_2 = get_chan_reestablish_msgs!(node_b, node_a); let mut resp_1 = Vec::new(); for msg in reestablish_1 { @@ -5754,9 +5776,11 @@ mod tests { nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id(), false); nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false); - let reestablish_1 = nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id()); + nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id()); + let reestablish_1 = get_chan_reestablish_msgs!(nodes[0], nodes[1]); assert_eq!(reestablish_1.len(), 1); - let reestablish_2 = nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id()); + nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id()); + let reestablish_2 = get_chan_reestablish_msgs!(nodes[1], nodes[0]); assert_eq!(reestablish_2.len(), 1); nodes[0].node.handle_channel_reestablish(&nodes[1].node.get_our_node_id(), &reestablish_2[0]).unwrap(); @@ -6042,9 +6066,11 @@ mod tests { nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id(), false); nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false); - let reestablish_1 = nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id()); + nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id()); + let reestablish_1 = get_chan_reestablish_msgs!(nodes[0], nodes[1]); assert_eq!(reestablish_1.len(), 1); - let reestablish_2 = nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id()); + nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id()); + let reestablish_2 = get_chan_reestablish_msgs!(nodes[1], nodes[0]); assert_eq!(reestablish_2.len(), 1); nodes[0].node.handle_channel_reestablish(&nodes[1].node.get_our_node_id(), &reestablish_2[0]).unwrap(); @@ -6062,9 +6088,11 @@ mod tests { assert!(nodes[0].node.get_and_clear_pending_events().is_empty()); assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty()); - let reestablish_1 = nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id()); + nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id()); + let reestablish_1 = get_chan_reestablish_msgs!(nodes[0], nodes[1]); assert_eq!(reestablish_1.len(), 1); - let reestablish_2 = nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id()); + nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id()); + let reestablish_2 = get_chan_reestablish_msgs!(nodes[1], nodes[0]); assert_eq!(reestablish_2.len(), 1); nodes[0].node.handle_channel_reestablish(&nodes[1].node.get_our_node_id(), &reestablish_2[0]).unwrap(); diff --git a/src/ln/msgs.rs b/src/ln/msgs.rs index 6335dc068..110e2f336 100644 --- a/src/ln/msgs.rs +++ b/src/ln/msgs.rs @@ -308,14 +308,14 @@ pub struct UpdateFee { pub(crate) feerate_per_kw: u32, } -#[derive(PartialEq)] +#[derive(PartialEq, Clone)] pub(crate) struct DataLossProtect { pub(crate) your_last_per_commitment_secret: [u8; 32], pub(crate) my_current_per_commitment_point: PublicKey, } /// A channel_reestablish message to be sent or received from a peer -#[derive(PartialEq)] +#[derive(PartialEq, Clone)] pub struct ChannelReestablish { pub(crate) channel_id: [u8; 32], pub(crate) next_local_commitment_number: u64, @@ -563,7 +563,7 @@ pub trait ChannelMessageHandler : events::MessageSendEventsProvider + Send + Syn fn peer_disconnected(&self, their_node_id: &PublicKey, no_connection_possible: bool); /// Handle a peer reconnecting, possibly generating channel_reestablish message(s). - fn peer_connected(&self, their_node_id: &PublicKey) -> Vec; + fn peer_connected(&self, their_node_id: &PublicKey); /// Handle an incoming channel_reestablish message from the given peer. fn handle_channel_reestablish(&self, their_node_id: &PublicKey, msg: &ChannelReestablish) -> Result<(), HandleError>; diff --git a/src/ln/peer_handler.rs b/src/ln/peer_handler.rs index 5c7e23c6b..4471ea002 100644 --- a/src/ln/peer_handler.rs +++ b/src/ln/peer_handler.rs @@ -520,9 +520,7 @@ impl PeerManager { }, 16); } - for msg in self.message_handler.chan_handler.peer_connected(&peer.their_node_id.unwrap()) { - encode_and_send_msg!(msg, 136); - } + self.message_handler.chan_handler.peer_connected(&peer.their_node_id.unwrap()); }, 17 => { let msg = try_potential_decodeerror!(msgs::ErrorMessage::read(&mut reader)); @@ -834,6 +832,16 @@ impl PeerManager { peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 38))); Self::do_attempt_write_data(&mut descriptor, peer); }, + MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => { + log_trace!(self, "Handling SendChannelReestablish event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + log_bytes!(msg.channel_id)); + let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, { + //TODO: Do whatever we're gonna do for handling dropped messages + }); + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 136))); + Self::do_attempt_write_data(&mut descriptor, peer); + }, MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => { log_trace!(self, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}", msg.contents.short_channel_id); if self.message_handler.route_handler.handle_channel_announcement(msg).is_ok() && self.message_handler.route_handler.handle_channel_update(update_msg).is_ok() { diff --git a/src/util/events.rs b/src/util/events.rs index cfe16151a..37cee2ede 100644 --- a/src/util/events.rs +++ b/src/util/events.rs @@ -175,6 +175,13 @@ pub enum MessageSendEvent { /// The message which should be sent. msg: msgs::Shutdown, }, + /// Used to indicate that a channel_reestablish message should be sent to the peer with the given node_id. + SendChannelReestablish { + /// The node_id of the node which should receive this message + node_id: PublicKey, + /// The message which should be sent. + msg: msgs::ChannelReestablish, + }, /// Used to indicate that a channel_announcement and channel_update should be broadcast to all /// peers (except the peer with node_id either msg.contents.node_id_1 or msg.contents.node_id_2). BroadcastChannelAnnouncement { diff --git a/src/util/test_utils.rs b/src/util/test_utils.rs index d20908a38..4fa29fd8e 100644 --- a/src/util/test_utils.rs +++ b/src/util/test_utils.rs @@ -135,9 +135,7 @@ impl msgs::ChannelMessageHandler for TestChannelMessageHandler { Err(HandleError { err: "", action: None }) } fn peer_disconnected(&self, _their_node_id: &PublicKey, _no_connection_possible: bool) {} - fn peer_connected(&self, _their_node_id: &PublicKey) -> Vec { - Vec::new() - } + fn peer_connected(&self, _their_node_id: &PublicKey) {} fn handle_error(&self, _their_node_id: &PublicKey, _msg: &msgs::ErrorMessage) {} }