}
}
- fn peer_connected(&self, their_node_id: &PublicKey) -> Vec<msgs::ChannelReestablish> {
- let mut res = Vec::new();
- let mut channel_state = self.channel_state.lock().unwrap();
+ fn peer_connected(&self, their_node_id: &PublicKey) {
+ let mut channel_state_lock = self.channel_state.lock().unwrap();
+ let channel_state = channel_state_lock.borrow_parts();
+ let pending_msg_events = channel_state.pending_msg_events;
channel_state.by_id.retain(|_, chan| {
if chan.get_their_node_id() == *their_node_id {
if !chan.have_received_message() {
// drop it.
false
} else {
- res.push(chan.get_channel_reestablish());
+ pending_msg_events.push(events::MessageSendEvent::SendChannelReestablish {
+ node_id: chan.get_their_node_id(),
+ msg: chan.get_channel_reestablish(),
+ });
true
}
} else { true }
});
//TODO: Also re-broadcast announcement_signatures
- res
}
fn handle_error(&self, their_node_id: &PublicKey, msg: &msgs::ErrorMessage) {
assert_eq!(channel_state.short_to_id.len(), 0);
}
+ macro_rules! get_chan_reestablish_msgs {
+ ($src_node: expr, $dst_node: expr) => {
+ {
+ let mut res = Vec::with_capacity(1);
+ for msg in $src_node.node.get_and_clear_pending_msg_events() {
+ if let MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } = msg {
+ assert_eq!(*node_id, $dst_node.node.get_our_node_id());
+ res.push(msg.clone());
+ } else {
+ panic!("Unexpected event")
+ }
+ }
+ res
+ }
+ }
+ }
+
macro_rules! handle_chan_reestablish_msgs {
($src_node: expr, $dst_node: expr) => {
{
/// pending_htlc_adds includes both the holding cell and in-flight update_add_htlcs, whereas
/// for claims/fails they are separated out.
fn reconnect_nodes(node_a: &Node, node_b: &Node, pre_all_htlcs: bool, pending_htlc_adds: (i64, i64), pending_htlc_claims: (usize, usize), pending_cell_htlc_claims: (usize, usize), pending_cell_htlc_fails: (usize, usize), pending_raa: (bool, bool)) {
- let reestablish_1 = node_a.node.peer_connected(&node_b.node.get_our_node_id());
- let reestablish_2 = node_b.node.peer_connected(&node_a.node.get_our_node_id());
+ node_a.node.peer_connected(&node_b.node.get_our_node_id());
+ let reestablish_1 = get_chan_reestablish_msgs!(node_a, node_b);
+ node_b.node.peer_connected(&node_a.node.get_our_node_id());
+ let reestablish_2 = get_chan_reestablish_msgs!(node_b, node_a);
let mut resp_1 = Vec::new();
for msg in reestablish_1 {
nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id(), false);
nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false);
- let reestablish_1 = nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id());
+ nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id());
+ let reestablish_1 = get_chan_reestablish_msgs!(nodes[0], nodes[1]);
assert_eq!(reestablish_1.len(), 1);
- let reestablish_2 = nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id());
+ nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id());
+ let reestablish_2 = get_chan_reestablish_msgs!(nodes[1], nodes[0]);
assert_eq!(reestablish_2.len(), 1);
nodes[0].node.handle_channel_reestablish(&nodes[1].node.get_our_node_id(), &reestablish_2[0]).unwrap();
nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id(), false);
nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false);
- let reestablish_1 = nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id());
+ nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id());
+ let reestablish_1 = get_chan_reestablish_msgs!(nodes[0], nodes[1]);
assert_eq!(reestablish_1.len(), 1);
- let reestablish_2 = nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id());
+ nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id());
+ let reestablish_2 = get_chan_reestablish_msgs!(nodes[1], nodes[0]);
assert_eq!(reestablish_2.len(), 1);
nodes[0].node.handle_channel_reestablish(&nodes[1].node.get_our_node_id(), &reestablish_2[0]).unwrap();
assert!(nodes[0].node.get_and_clear_pending_events().is_empty());
assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty());
- let reestablish_1 = nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id());
+ nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id());
+ let reestablish_1 = get_chan_reestablish_msgs!(nodes[0], nodes[1]);
assert_eq!(reestablish_1.len(), 1);
- let reestablish_2 = nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id());
+ nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id());
+ let reestablish_2 = get_chan_reestablish_msgs!(nodes[1], nodes[0]);
assert_eq!(reestablish_2.len(), 1);
nodes[0].node.handle_channel_reestablish(&nodes[1].node.get_our_node_id(), &reestablish_2[0]).unwrap();
pub(crate) feerate_per_kw: u32,
}
-#[derive(PartialEq)]
+#[derive(PartialEq, Clone)]
pub(crate) struct DataLossProtect {
pub(crate) your_last_per_commitment_secret: [u8; 32],
pub(crate) my_current_per_commitment_point: PublicKey,
}
/// A channel_reestablish message to be sent or received from a peer
-#[derive(PartialEq)]
+#[derive(PartialEq, Clone)]
pub struct ChannelReestablish {
pub(crate) channel_id: [u8; 32],
pub(crate) next_local_commitment_number: u64,
fn peer_disconnected(&self, their_node_id: &PublicKey, no_connection_possible: bool);
/// Handle a peer reconnecting, possibly generating channel_reestablish message(s).
- fn peer_connected(&self, their_node_id: &PublicKey) -> Vec<ChannelReestablish>;
+ fn peer_connected(&self, their_node_id: &PublicKey);
/// Handle an incoming channel_reestablish message from the given peer.
fn handle_channel_reestablish(&self, their_node_id: &PublicKey, msg: &ChannelReestablish) -> Result<(), HandleError>;
}, 16);
}
- for msg in self.message_handler.chan_handler.peer_connected(&peer.their_node_id.unwrap()) {
- encode_and_send_msg!(msg, 136);
- }
+ self.message_handler.chan_handler.peer_connected(&peer.their_node_id.unwrap());
},
17 => {
let msg = try_potential_decodeerror!(msgs::ErrorMessage::read(&mut reader));
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 38)));
Self::do_attempt_write_data(&mut descriptor, peer);
},
+ MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => {
+ log_trace!(self, "Handling SendChannelReestablish event in peer_handler for node {} for channel {}",
+ log_pubkey!(node_id),
+ log_bytes!(msg.channel_id));
+ let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
+ //TODO: Do whatever we're gonna do for handling dropped messages
+ });
+ peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 136)));
+ Self::do_attempt_write_data(&mut descriptor, peer);
+ },
MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => {
log_trace!(self, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}", msg.contents.short_channel_id);
if self.message_handler.route_handler.handle_channel_announcement(msg).is_ok() && self.message_handler.route_handler.handle_channel_update(update_msg).is_ok() {