From 812f255d7c53c92656493fe103ad4129d1363a90 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Fri, 19 Oct 2018 21:50:16 -0400 Subject: [PATCH] Send shutdown/closing_signed msgs out-of-band for ordered delivery --- src/ln/channelmanager.rs | 149 +++++++++++++++++++++++++-------------- src/ln/msgs.rs | 6 +- src/ln/peer_handler.rs | 23 +++--- src/util/events.rs | 7 ++ src/util/test_utils.rs | 4 +- 5 files changed, 120 insertions(+), 69 deletions(-) diff --git a/src/ln/channelmanager.rs b/src/ln/channelmanager.rs index 4777a3e81..ace557528 100644 --- a/src/ln/channelmanager.rs +++ b/src/ln/channelmanager.rs @@ -1758,8 +1758,8 @@ impl ChannelManager { } } - fn internal_shutdown(&self, their_node_id: &PublicKey, msg: &msgs::Shutdown) -> Result<(Option, Option), MsgHandleErrInternal> { - let (mut res, chan_option) = { + fn internal_shutdown(&self, their_node_id: &PublicKey, msg: &msgs::Shutdown) -> Result<(), MsgHandleErrInternal> { + let (mut dropped_htlcs, chan_option) = { let mut channel_state_lock = self.channel_state.lock().unwrap(); let channel_state = channel_state_lock.borrow_parts(); @@ -1769,18 +1769,30 @@ impl ChannelManager { //TODO: here and below MsgHandleErrInternal, #153 case return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!", msg.channel_id)); } - let res = chan_entry.get_mut().shutdown(&*self.fee_estimator, &msg).map_err(|e| MsgHandleErrInternal::from_maybe_close(e))?; + let (shutdown, closing_signed, dropped_htlcs) = chan_entry.get_mut().shutdown(&*self.fee_estimator, &msg).map_err(|e| MsgHandleErrInternal::from_maybe_close(e))?; + if let Some(msg) = shutdown { + channel_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown { + node_id: their_node_id.clone(), + msg, + }); + } + if let Some(msg) = closing_signed { + channel_state.pending_msg_events.push(events::MessageSendEvent::SendClosingSigned { + node_id: their_node_id.clone(), + msg, + }); + } if chan_entry.get().is_shutdown() { if let Some(short_id) = chan_entry.get().get_short_channel_id() { channel_state.short_to_id.remove(&short_id); } - (res, Some(chan_entry.remove_entry().1)) - } else { (res, None) } + (dropped_htlcs, Some(chan_entry.remove_entry().1)) + } else { (dropped_htlcs, None) } }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel", msg.channel_id)) } }; - for htlc_source in res.2.drain(..) { + for htlc_source in dropped_htlcs.drain(..) { // unknown_next_peer...I dunno who that is anymore.... self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_source.0, &htlc_source.1, HTLCFailReason::Reason { failure_code: 0x4000 | 10, data: Vec::new() }); } @@ -1792,11 +1804,11 @@ impl ChannelManager { }); } } - Ok((res.0, res.1)) + Ok(()) } - fn internal_closing_signed(&self, their_node_id: &PublicKey, msg: &msgs::ClosingSigned) -> Result, MsgHandleErrInternal> { - let (res, chan_option) = { + fn internal_closing_signed(&self, their_node_id: &PublicKey, msg: &msgs::ClosingSigned) -> Result<(), MsgHandleErrInternal> { + let (tx, chan_option) = { let mut channel_state_lock = self.channel_state.lock().unwrap(); let channel_state = channel_state_lock.borrow_parts(); match channel_state.by_id.entry(msg.channel_id.clone()) { @@ -1805,8 +1817,14 @@ impl ChannelManager { //TODO: here and below MsgHandleErrInternal, #153 case return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!", msg.channel_id)); } - let res = chan_entry.get_mut().closing_signed(&*self.fee_estimator, &msg).map_err(|e| MsgHandleErrInternal::from_maybe_close(e))?; - if res.1.is_some() { + let (closing_signed, tx) = chan_entry.get_mut().closing_signed(&*self.fee_estimator, &msg).map_err(|e| MsgHandleErrInternal::from_maybe_close(e))?; + if let Some(msg) = closing_signed { + channel_state.pending_msg_events.push(events::MessageSendEvent::SendClosingSigned { + node_id: their_node_id.clone(), + msg, + }); + } + if tx.is_some() { // We're done with this channel, we've got a signed closing transaction and // will send the closing_signed back to the remote peer upon return. This // also implies there are no pending HTLCs left on the channel, so we can @@ -1815,13 +1833,13 @@ impl ChannelManager { if let Some(short_id) = chan_entry.get().get_short_channel_id() { channel_state.short_to_id.remove(&short_id); } - (res, Some(chan_entry.remove_entry().1)) - } else { (res, None) } + (tx, Some(chan_entry.remove_entry().1)) + } else { (tx, None) } }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel", msg.channel_id)) } }; - if let Some(broadcast_tx) = res.1 { + if let Some(broadcast_tx) = tx { self.tx_broadcaster.broadcast_transaction(&broadcast_tx); } if let Some(chan) = chan_option { @@ -1832,7 +1850,7 @@ impl ChannelManager { }); } } - Ok(res.0) + Ok(()) } fn internal_update_add_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateAddHTLC) -> Result<(), MsgHandleErrInternal> { @@ -2495,11 +2513,11 @@ impl ChannelMessageHandler for ChannelManager { handle_error!(self, self.internal_funding_locked(their_node_id, msg), their_node_id) } - fn handle_shutdown(&self, their_node_id: &PublicKey, msg: &msgs::Shutdown) -> Result<(Option, Option), HandleError> { + fn handle_shutdown(&self, their_node_id: &PublicKey, msg: &msgs::Shutdown) -> Result<(), HandleError> { handle_error!(self, self.internal_shutdown(their_node_id, msg), their_node_id) } - fn handle_closing_signed(&self, their_node_id: &PublicKey, msg: &msgs::ClosingSigned) -> Result, HandleError> { + fn handle_closing_signed(&self, their_node_id: &PublicKey, msg: &msgs::ClosingSigned) -> Result<(), HandleError> { handle_error!(self, self.internal_closing_signed(their_node_id, msg), their_node_id) } @@ -3025,68 +3043,91 @@ mod tests { } fn close_channel(outbound_node: &Node, inbound_node: &Node, channel_id: &[u8; 32], funding_tx: Transaction, close_inbound_first: bool) -> (msgs::ChannelUpdate, msgs::ChannelUpdate) { - let (node_a, broadcaster_a) = if close_inbound_first { (&inbound_node.node, &inbound_node.tx_broadcaster) } else { (&outbound_node.node, &outbound_node.tx_broadcaster) }; + let (node_a, broadcaster_a, struct_a) = if close_inbound_first { (&inbound_node.node, &inbound_node.tx_broadcaster, inbound_node) } else { (&outbound_node.node, &outbound_node.tx_broadcaster, outbound_node) }; let (node_b, broadcaster_b) = if close_inbound_first { (&outbound_node.node, &outbound_node.tx_broadcaster) } else { (&inbound_node.node, &inbound_node.tx_broadcaster) }; let (tx_a, tx_b); node_a.close_channel(channel_id).unwrap(); - let events_1 = node_a.get_and_clear_pending_msg_events(); - assert_eq!(events_1.len(), 1); - let shutdown_a = match events_1[0] { + node_b.handle_shutdown(&node_a.get_our_node_id(), &get_event_msg!(struct_a, MessageSendEvent::SendShutdown, node_b.get_our_node_id())).unwrap(); + + let events_1 = node_b.get_and_clear_pending_msg_events(); + assert!(events_1.len() >= 1); + let shutdown_b = match events_1[0] { MessageSendEvent::SendShutdown { ref node_id, ref msg } => { - assert_eq!(node_id, &node_b.get_our_node_id()); + assert_eq!(node_id, &node_a.get_our_node_id()); msg.clone() }, _ => panic!("Unexpected event"), }; - let (shutdown_b, mut closing_signed_b) = node_b.handle_shutdown(&node_a.get_our_node_id(), &shutdown_a).unwrap(); - if !close_inbound_first { - assert!(closing_signed_b.is_none()); + let closing_signed_b = if !close_inbound_first { + assert_eq!(events_1.len(), 1); + None + } else { + Some(match events_1[1] { + MessageSendEvent::SendClosingSigned { ref node_id, ref msg } => { + assert_eq!(node_id, &node_a.get_our_node_id()); + msg.clone() + }, + _ => panic!("Unexpected event"), + }) + }; + + macro_rules! get_closing_signed_broadcast { + ($node: expr, $dest_pubkey: expr) => { + { + let events = $node.get_and_clear_pending_msg_events(); + assert!(events.len() == 1 || events.len() == 2); + (match events[events.len() - 1] { + MessageSendEvent::BroadcastChannelUpdate { ref msg } => { + msg.clone() + }, + _ => panic!("Unexpected event"), + }, if events.len() == 2 { + match events[0] { + MessageSendEvent::SendClosingSigned { ref node_id, ref msg } => { + assert_eq!(*node_id, $dest_pubkey); + Some(msg.clone()) + }, + _ => panic!("Unexpected event"), + } + } else { None }) + } + } } - let (empty_a, mut closing_signed_a) = node_a.handle_shutdown(&node_b.get_our_node_id(), &shutdown_b.unwrap()).unwrap(); - assert!(empty_a.is_none()); - if close_inbound_first { - assert!(closing_signed_a.is_none()); - closing_signed_a = node_a.handle_closing_signed(&node_b.get_our_node_id(), &closing_signed_b.unwrap()).unwrap(); + + node_a.handle_shutdown(&node_b.get_our_node_id(), &shutdown_b).unwrap(); + let (as_update, bs_update) = if close_inbound_first { + assert!(node_a.get_and_clear_pending_msg_events().is_empty()); + node_a.handle_closing_signed(&node_b.get_our_node_id(), &closing_signed_b.unwrap()).unwrap(); assert_eq!(broadcaster_a.txn_broadcasted.lock().unwrap().len(), 1); tx_a = broadcaster_a.txn_broadcasted.lock().unwrap().remove(0); + let (as_update, closing_signed_a) = get_closing_signed_broadcast!(node_a, node_b.get_our_node_id()); - let empty_b = node_b.handle_closing_signed(&node_a.get_our_node_id(), &closing_signed_a.unwrap()).unwrap(); - assert!(empty_b.is_none()); + node_b.handle_closing_signed(&node_a.get_our_node_id(), &closing_signed_a.unwrap()).unwrap(); + let (bs_update, none_b) = get_closing_signed_broadcast!(node_b, node_a.get_our_node_id()); + assert!(none_b.is_none()); assert_eq!(broadcaster_b.txn_broadcasted.lock().unwrap().len(), 1); tx_b = broadcaster_b.txn_broadcasted.lock().unwrap().remove(0); + (as_update, bs_update) } else { - closing_signed_b = node_b.handle_closing_signed(&node_a.get_our_node_id(), &closing_signed_a.unwrap()).unwrap(); + let closing_signed_a = get_event_msg!(struct_a, MessageSendEvent::SendClosingSigned, node_b.get_our_node_id()); + + node_b.handle_closing_signed(&node_a.get_our_node_id(), &closing_signed_a).unwrap(); assert_eq!(broadcaster_b.txn_broadcasted.lock().unwrap().len(), 1); tx_b = broadcaster_b.txn_broadcasted.lock().unwrap().remove(0); + let (bs_update, closing_signed_b) = get_closing_signed_broadcast!(node_b, node_a.get_our_node_id()); - let empty_a2 = node_a.handle_closing_signed(&node_b.get_our_node_id(), &closing_signed_b.unwrap()).unwrap(); - assert!(empty_a2.is_none()); + node_a.handle_closing_signed(&node_b.get_our_node_id(), &closing_signed_b.unwrap()).unwrap(); + let (as_update, none_a) = get_closing_signed_broadcast!(node_a, node_b.get_our_node_id()); + assert!(none_a.is_none()); assert_eq!(broadcaster_a.txn_broadcasted.lock().unwrap().len(), 1); tx_a = broadcaster_a.txn_broadcasted.lock().unwrap().remove(0); - } + (as_update, bs_update) + }; assert_eq!(tx_a, tx_b); check_spends!(tx_a, funding_tx); - let events_2 = node_a.get_and_clear_pending_msg_events(); - assert_eq!(events_2.len(), 1); - let as_update = match events_2[0] { - MessageSendEvent::BroadcastChannelUpdate { ref msg } => { - msg.clone() - }, - _ => panic!("Unexpected event"), - }; - - let events_3 = node_b.get_and_clear_pending_msg_events(); - assert_eq!(events_3.len(), 1); - let bs_update = match events_3[0] { - MessageSendEvent::BroadcastChannelUpdate { ref msg } => { - msg.clone() - }, - _ => panic!("Unexpected event"), - }; - (as_update, bs_update) } diff --git a/src/ln/msgs.rs b/src/ln/msgs.rs index d01595b72..b8fdeb129 100644 --- a/src/ln/msgs.rs +++ b/src/ln/msgs.rs @@ -235,12 +235,14 @@ pub struct FundingLocked { } /// A shutdown message to be sent or received from a peer +#[derive(Clone)] pub struct Shutdown { pub(crate) channel_id: [u8; 32], pub(crate) scriptpubkey: Script, } /// A closing_signed message to be sent or received from a peer +#[derive(Clone)] pub struct ClosingSigned { pub(crate) channel_id: [u8; 32], pub(crate) fee_satoshis: u64, @@ -540,9 +542,9 @@ pub trait ChannelMessageHandler : events::MessageSendEventsProvider + Send + Syn // Channl close: /// Handle an incoming shutdown message from the given peer. - fn handle_shutdown(&self, their_node_id: &PublicKey, msg: &Shutdown) -> Result<(Option, Option), HandleError>; + fn handle_shutdown(&self, their_node_id: &PublicKey, msg: &Shutdown) -> Result<(), HandleError>; /// Handle an incoming closing_signed message from the given peer. - fn handle_closing_signed(&self, their_node_id: &PublicKey, msg: &ClosingSigned) -> Result, HandleError>; + fn handle_closing_signed(&self, their_node_id: &PublicKey, msg: &ClosingSigned) -> Result<(), HandleError>; // HTLC handling: /// Handle an incoming update_add_htlc message from the given peer. diff --git a/src/ln/peer_handler.rs b/src/ln/peer_handler.rs index 9a2cee9dd..5f1a8dc40 100644 --- a/src/ln/peer_handler.rs +++ b/src/ln/peer_handler.rs @@ -581,20 +581,11 @@ impl PeerManager { 38 => { let msg = try_potential_decodeerror!(msgs::Shutdown::read(&mut reader)); - let resp_options = try_potential_handleerror!(self.message_handler.chan_handler.handle_shutdown(&peer.their_node_id.unwrap(), &msg)); - if let Some(resp) = resp_options.0 { - encode_and_send_msg!(resp, 38); - } - if let Some(resp) = resp_options.1 { - encode_and_send_msg!(resp, 39); - } + try_potential_handleerror!(self.message_handler.chan_handler.handle_shutdown(&peer.their_node_id.unwrap(), &msg)); }, 39 => { let msg = try_potential_decodeerror!(msgs::ClosingSigned::read(&mut reader)); - let resp_option = try_potential_handleerror!(self.message_handler.chan_handler.handle_closing_signed(&peer.their_node_id.unwrap(), &msg)); - if let Some(resp) = resp_option { - encode_and_send_msg!(resp, 39); - } + try_potential_handleerror!(self.message_handler.chan_handler.handle_closing_signed(&peer.their_node_id.unwrap(), &msg)); }, 128 => { @@ -883,6 +874,16 @@ impl PeerManager { peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 133))); Self::do_attempt_write_data(&mut descriptor, peer); }, + MessageSendEvent::SendClosingSigned { ref node_id, ref msg } => { + log_trace!(self, "Handling SendClosingSigned event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + log_bytes!(msg.channel_id)); + let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, { + //TODO: Do whatever we're gonna do for handling dropped messages + }); + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 39))); + Self::do_attempt_write_data(&mut descriptor, peer); + }, MessageSendEvent::SendShutdown { ref node_id, ref msg } => { log_trace!(self, "Handling Shutdown event in peer_handler for node {} for channel {}", log_pubkey!(node_id), diff --git a/src/util/events.rs b/src/util/events.rs index d7312cd91..cfe16151a 100644 --- a/src/util/events.rs +++ b/src/util/events.rs @@ -161,6 +161,13 @@ pub enum MessageSendEvent { /// The message which should be sent. msg: msgs::RevokeAndACK, }, + /// Used to indicate that a closing_signed message should be sent to the peer with the given node_id. + SendClosingSigned { + /// The node_id of the node which should receive this message + node_id: PublicKey, + /// The message which should be sent. + msg: msgs::ClosingSigned, + }, /// Used to indicate that a shutdown message should be sent to the peer with the given node_id. SendShutdown { /// The node_id of the node which should receive this message diff --git a/src/util/test_utils.rs b/src/util/test_utils.rs index f8959dcf1..123ddd06f 100644 --- a/src/util/test_utils.rs +++ b/src/util/test_utils.rs @@ -101,10 +101,10 @@ impl msgs::ChannelMessageHandler for TestChannelMessageHandler { fn handle_funding_locked(&self, _their_node_id: &PublicKey, _msg: &msgs::FundingLocked) -> Result<(), HandleError> { Err(HandleError { err: "", action: None }) } - fn handle_shutdown(&self, _their_node_id: &PublicKey, _msg: &msgs::Shutdown) -> Result<(Option, Option), HandleError> { + fn handle_shutdown(&self, _their_node_id: &PublicKey, _msg: &msgs::Shutdown) -> Result<(), HandleError> { Err(HandleError { err: "", action: None }) } - fn handle_closing_signed(&self, _their_node_id: &PublicKey, _msg: &msgs::ClosingSigned) -> Result, HandleError> { + fn handle_closing_signed(&self, _their_node_id: &PublicKey, _msg: &msgs::ClosingSigned) -> Result<(), HandleError> { Err(HandleError { err: "", action: None }) } fn handle_update_add_htlc(&self, _their_node_id: &PublicKey, _msg: &msgs::UpdateAddHTLC) -> Result<(), HandleError> { -- 2.39.5