}
}
+enum InitSyncTracker{
+ NoSyncRequested,
+ ChannelsSyncing(u64),
+ NodesSyncing(PublicKey),
+}
+
struct Peer {
channel_encryptor: PeerChannelEncryptor,
outbound: bool,
pending_read_buffer: Vec<u8>,
pending_read_buffer_pos: usize,
pending_read_is_header: bool,
+
+ sync_status: InitSyncTracker,
+}
+
+impl Peer {
+ /// Returns true if the channel announcements/updates for the given channel should be
+ /// forwarded to this peer.
+ /// If we are sending our routing table to this peer and we have not yet sent channel
+ /// announcements/updates for the given channel_id then we will send it when we get to that
+ /// point and we shouldn't send it yet to avoid sending duplicate updates. If we've already
+ /// sent the old versions, we should send the update, and so return true here.
+ fn should_forward_channel(&self, channel_id: u64)->bool{
+ match self.sync_status {
+ InitSyncTracker::NoSyncRequested => true,
+ InitSyncTracker::ChannelsSyncing(i) => i < channel_id,
+ InitSyncTracker::NodesSyncing(_) => true,
+ }
+ }
}
struct PeerHolder<Descriptor: SocketDescriptor> {
if peers.peers.insert(descriptor, Peer {
channel_encryptor: peer_encryptor,
outbound: true,
- their_node_id: Some(their_node_id),
+ their_node_id: None,
their_global_features: None,
their_local_features: None,
pending_read_buffer: pending_read_buffer,
pending_read_buffer_pos: 0,
pending_read_is_header: false,
+
+ sync_status: InitSyncTracker::NoSyncRequested,
}).is_some() {
panic!("PeerManager driver duplicated descriptors!");
};
pending_read_buffer: pending_read_buffer,
pending_read_buffer_pos: 0,
pending_read_is_header: false,
+
+ sync_status: InitSyncTracker::NoSyncRequested,
}).is_some() {
panic!("PeerManager driver duplicated descriptors!");
};
Ok(())
}
- fn do_attempt_write_data(descriptor: &mut Descriptor, peer: &mut Peer) {
+ fn do_attempt_write_data(&self, descriptor: &mut Descriptor, peer: &mut Peer) {
+ macro_rules! encode_and_send_msg {
+ ($msg: expr, $msg_code: expr) => {
+ {
+ log_trace!(self, "Encoding and sending sync update message of type {} to {}", $msg_code, log_pubkey!(peer.their_node_id.unwrap()));
+ peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!($msg, $msg_code)[..]));
+ }
+ }
+ }
+ const MSG_BUFF_SIZE: usize = 10;
while !peer.awaiting_write_event {
+ if peer.pending_outbound_buffer.len() < MSG_BUFF_SIZE {
+ match peer.sync_status {
+ InitSyncTracker::NoSyncRequested => {},
+ InitSyncTracker::ChannelsSyncing(c) if c < 0xffff_ffff_ffff_ffff => {
+ let steps = ((MSG_BUFF_SIZE - peer.pending_outbound_buffer.len() + 2) / 3) as u8;
+ let all_messages = self.message_handler.route_handler.get_next_channel_announcements(0, steps);
+ for &(ref announce, ref update_a, ref update_b) in all_messages.iter() {
+ encode_and_send_msg!(announce, 256);
+ encode_and_send_msg!(update_a, 258);
+ encode_and_send_msg!(update_b, 258);
+ peer.sync_status = InitSyncTracker::ChannelsSyncing(announce.contents.short_channel_id + 1);
+ }
+ if all_messages.is_empty() || all_messages.len() != steps as usize {
+ peer.sync_status = InitSyncTracker::ChannelsSyncing(0xffff_ffff_ffff_ffff);
+ }
+ },
+ InitSyncTracker::ChannelsSyncing(c) if c == 0xffff_ffff_ffff_ffff => {
+ let steps = (MSG_BUFF_SIZE - peer.pending_outbound_buffer.len()) as u8;
+ let all_messages = self.message_handler.route_handler.get_next_node_announcements(None, steps);
+ for msg in all_messages.iter() {
+ encode_and_send_msg!(msg, 256);
+ peer.sync_status = InitSyncTracker::NodesSyncing(msg.contents.node_id);
+ }
+ if all_messages.is_empty() || all_messages.len() != steps as usize {
+ peer.sync_status = InitSyncTracker::NoSyncRequested;
+ }
+ },
+ InitSyncTracker::ChannelsSyncing(_) => unreachable!(),
+ InitSyncTracker::NodesSyncing(key) => {
+ let steps = (MSG_BUFF_SIZE - peer.pending_outbound_buffer.len()) as u8;
+ let all_messages = self.message_handler.route_handler.get_next_node_announcements(Some(&key), steps);
+ for msg in all_messages.iter() {
+ encode_and_send_msg!(msg, 256);
+ peer.sync_status = InitSyncTracker::NodesSyncing(msg.contents.node_id);
+ }
+ if all_messages.is_empty() || all_messages.len() != steps as usize {
+ peer.sync_status = InitSyncTracker::NoSyncRequested;
+ }
+ },
+ }
+ }
+
if {
let next_buff = match peer.pending_outbound_buffer.front() {
None => return,
Some(buff) => buff,
};
- let should_be_reading = peer.pending_outbound_buffer.len() < 10;
+ let should_be_reading = peer.pending_outbound_buffer.len() < MSG_BUFF_SIZE;
let data_sent = descriptor.send_data(next_buff, peer.pending_outbound_buffer_first_msg_offset, should_be_reading);
peer.pending_outbound_buffer_first_msg_offset += data_sent;
if peer.pending_outbound_buffer_first_msg_offset == next_buff.len() { true } else { false }
None => panic!("Descriptor for write_event is not already known to PeerManager"),
Some(peer) => {
peer.awaiting_write_event = false;
- Self::do_attempt_write_data(descriptor, peer);
+ self.do_attempt_write_data(descriptor, peer);
}
};
Ok(())
match e {
msgs::DecodeError::UnknownVersion => return Err(PeerHandleError{ no_connection_possible: false }),
msgs::DecodeError::UnknownRequiredFeature => {
- log_debug!(self, "Got a channel/node announcement with an known required feature flag, you may want to udpate!");
+ log_debug!(self, "Got a channel/node announcement with an known required feature flag, you may want to update!");
continue;
},
- msgs::DecodeError::InvalidValue => return Err(PeerHandleError{ no_connection_possible: false }),
- msgs::DecodeError::ShortRead => return Err(PeerHandleError{ no_connection_possible: false }),
+ msgs::DecodeError::InvalidValue => {
+ log_debug!(self, "Got an invalid value while deserializing message");
+ return Err(PeerHandleError{ no_connection_possible: false });
+ },
+ msgs::DecodeError::ShortRead => {
+ log_debug!(self, "Deserialization failed due to shortness of message");
+ return Err(PeerHandleError{ no_connection_possible: false });
+ },
msgs::DecodeError::ExtraAddressesPerType => {
log_debug!(self, "Error decoding message, ignoring due to lnd spec incompatibility. See https://github.com/lightningnetwork/lnd/issues/1407");
continue;
() => {
match peers.node_id_to_descriptor.entry(peer.their_node_id.unwrap()) {
hash_map::Entry::Occupied(_) => {
+ log_trace!(self, "Got second connection with {}, closing", log_pubkey!(peer.their_node_id.unwrap()));
peer.their_node_id = None; // Unset so that we don't generate a peer_disconnected event
return Err(PeerHandleError{ no_connection_possible: false })
},
- hash_map::Entry::Vacant(entry) => entry.insert(peer_descriptor.clone()),
+ hash_map::Entry::Vacant(entry) => {
+ log_trace!(self, "Finished noise handshake for connection with {}", log_pubkey!(peer.their_node_id.unwrap()));
+ entry.insert(peer_descriptor.clone())
+ },
};
}
}
peer.pending_read_buffer = [0; 66].to_vec(); // act three is 66 bytes long
},
NextNoiseStep::ActTwo => {
- let act_three = try_potential_handleerror!(peer.channel_encryptor.process_act_two(&peer.pending_read_buffer[..], &self.our_node_secret)).to_vec();
- peer.pending_outbound_buffer.push_back(act_three);
+ let (act_three, their_node_id) = try_potential_handleerror!(peer.channel_encryptor.process_act_two(&peer.pending_read_buffer[..], &self.our_node_secret));
+ peer.pending_outbound_buffer.push_back(act_three.to_vec());
peer.pending_read_buffer = [0; 18].to_vec(); // Message length header is 18 bytes
peer.pending_read_is_header = true;
+ peer.their_node_id = Some(their_node_id);
insert_node_id!();
let mut local_features = msgs::LocalFeatures::new();
if self.initial_syncs_sent.load(Ordering::Acquire) < INITIAL_SYNCS_TO_SEND {
log_trace!(self, "Received message of type {} from {}", msg_type, log_pubkey!(peer.their_node_id.unwrap()));
if msg_type != 16 && peer.their_global_features.is_none() {
// Need an init message as first message
+ log_trace!(self, "Peer {} sent non-Init first message", log_pubkey!(peer.their_node_id.unwrap()));
return Err(PeerHandleError{ no_connection_possible: false });
}
let mut reader = ::std::io::Cursor::new(&msg_data[2..]);
if msg.local_features.supports_unknown_bits() { "present" } else { "none" },
if msg.global_features.supports_unknown_bits() { "present" } else { "none" });
+ if msg.local_features.initial_routing_sync() {
+ peer.sync_status = InitSyncTracker::ChannelsSyncing(0);
+ peers.peers_needing_send.insert(peer_descriptor.clone());
+ }
peer.their_global_features = Some(msg.global_features);
peer.their_local_features = Some(msg.local_features);
self.initial_syncs_sent.fetch_add(1, Ordering::AcqRel);
local_features.set_initial_routing_sync();
}
+
encode_and_send_msg!(msgs::Init {
global_features: msgs::GlobalFeatures::new(),
local_features,
// Channel control:
32 => {
let msg = try_potential_decodeerror!(msgs::OpenChannel::read(&mut reader));
- try_potential_handleerror!(self.message_handler.chan_handler.handle_open_channel(&peer.their_node_id.unwrap(), &msg));
+ try_potential_handleerror!(self.message_handler.chan_handler.handle_open_channel(&peer.their_node_id.unwrap(), peer.their_local_features.clone().unwrap(), &msg));
},
33 => {
let msg = try_potential_decodeerror!(msgs::AcceptChannel::read(&mut reader));
- try_potential_handleerror!(self.message_handler.chan_handler.handle_accept_channel(&peer.their_node_id.unwrap(), &msg));
+ try_potential_handleerror!(self.message_handler.chan_handler.handle_accept_channel(&peer.their_node_id.unwrap(), peer.their_local_features.clone().unwrap(), &msg));
},
34 => {
}
}
- Self::do_attempt_write_data(peer_descriptor, peer);
+ self.do_attempt_write_data(peer_descriptor, peer);
peer.pending_outbound_buffer.len() > 10 // pause_read
}
//TODO: Drop the pending channel? (or just let it timeout, but that sucks)
});
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 33)));
- Self::do_attempt_write_data(&mut descriptor, peer);
+ self.do_attempt_write_data(&mut descriptor, peer);
},
MessageSendEvent::SendOpenChannel { ref node_id, ref msg } => {
log_trace!(self, "Handling SendOpenChannel event in peer_handler for node {} for channel {}",
//TODO: Drop the pending channel? (or just let it timeout, but that sucks)
});
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 32)));
- Self::do_attempt_write_data(&mut descriptor, peer);
+ self.do_attempt_write_data(&mut descriptor, peer);
},
MessageSendEvent::SendFundingCreated { ref node_id, ref msg } => {
log_trace!(self, "Handling SendFundingCreated event in peer_handler for node {} for channel {} (which becomes {})",
//they should just throw away this funding transaction
});
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 34)));
- Self::do_attempt_write_data(&mut descriptor, peer);
+ self.do_attempt_write_data(&mut descriptor, peer);
},
MessageSendEvent::SendFundingSigned { ref node_id, ref msg } => {
log_trace!(self, "Handling SendFundingSigned event in peer_handler for node {} for channel {}",
//they should just throw away this funding transaction
});
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 35)));
- Self::do_attempt_write_data(&mut descriptor, peer);
+ self.do_attempt_write_data(&mut descriptor, peer);
},
MessageSendEvent::SendFundingLocked { ref node_id, ref msg } => {
log_trace!(self, "Handling SendFundingLocked event in peer_handler for node {} for channel {}",
//TODO: Do whatever we're gonna do for handling dropped messages
});
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 36)));
- Self::do_attempt_write_data(&mut descriptor, peer);
+ self.do_attempt_write_data(&mut descriptor, peer);
},
MessageSendEvent::SendAnnouncementSignatures { ref node_id, ref msg } => {
log_trace!(self, "Handling SendAnnouncementSignatures event in peer_handler for node {} for channel {})",
//they should just throw away this funding transaction
});
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 259)));
- Self::do_attempt_write_data(&mut descriptor, peer);
+ self.do_attempt_write_data(&mut descriptor, peer);
},
MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
log_trace!(self, "Handling UpdateHTLCs event in peer_handler for node {} with {} adds, {} fulfills, {} fails for channel {}",
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 134)));
}
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(commitment_signed, 132)));
- Self::do_attempt_write_data(&mut descriptor, peer);
+ self.do_attempt_write_data(&mut descriptor, peer);
},
MessageSendEvent::SendRevokeAndACK { ref node_id, ref msg } => {
log_trace!(self, "Handling SendRevokeAndACK event in peer_handler for node {} for channel {}",
//TODO: Do whatever we're gonna do for handling dropped messages
});
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 133)));
- Self::do_attempt_write_data(&mut descriptor, peer);
+ self.do_attempt_write_data(&mut descriptor, peer);
},
MessageSendEvent::SendClosingSigned { ref node_id, ref msg } => {
log_trace!(self, "Handling SendClosingSigned event in peer_handler for node {} for channel {}",
//TODO: Do whatever we're gonna do for handling dropped messages
});
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 39)));
- Self::do_attempt_write_data(&mut descriptor, peer);
+ self.do_attempt_write_data(&mut descriptor, peer);
},
MessageSendEvent::SendShutdown { ref node_id, ref msg } => {
log_trace!(self, "Handling Shutdown event in peer_handler for node {} for channel {}",
//TODO: Do whatever we're gonna do for handling dropped messages
});
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 38)));
- Self::do_attempt_write_data(&mut descriptor, peer);
+ self.do_attempt_write_data(&mut descriptor, peer);
},
MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => {
log_trace!(self, "Handling SendChannelReestablish event in peer_handler for node {} for channel {}",
//TODO: Do whatever we're gonna do for handling dropped messages
});
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 136)));
- Self::do_attempt_write_data(&mut descriptor, peer);
+ self.do_attempt_write_data(&mut descriptor, peer);
},
MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => {
log_trace!(self, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}", msg.contents.short_channel_id);
let encoded_update_msg = encode_msg!(update_msg, 258);
for (ref descriptor, ref mut peer) in peers.peers.iter_mut() {
- if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_global_features.is_none() {
+ if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_global_features.is_none() ||
+ !peer.should_forward_channel(msg.contents.short_channel_id) {
continue
}
match peer.their_node_id {
}
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_update_msg[..]));
- Self::do_attempt_write_data(&mut (*descriptor).clone(), peer);
+ self.do_attempt_write_data(&mut (*descriptor).clone(), peer);
}
}
},
let encoded_msg = encode_msg!(msg, 258);
for (ref descriptor, ref mut peer) in peers.peers.iter_mut() {
- if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_global_features.is_none() {
+ if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_global_features.is_none() ||
+ !peer.should_forward_channel(msg.contents.short_channel_id) {
continue
}
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
- Self::do_attempt_write_data(&mut (*descriptor).clone(), peer);
+ self.do_attempt_write_data(&mut (*descriptor).clone(), peer);
}
}
},
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 17)));
// This isn't guaranteed to work, but if there is enough free
// room in the send buffer, put the error message there...
- Self::do_attempt_write_data(&mut descriptor, &mut peer);
+ self.do_attempt_write_data(&mut descriptor, &mut peer);
} else {
log_trace!(self, "Handling DisconnectPeer HandleError event in peer_handler for node {} with no message", log_pubkey!(node_id));
}
//TODO: Do whatever we're gonna do for handling dropped messages
});
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 17)));
- Self::do_attempt_write_data(&mut descriptor, peer);
+ self.do_attempt_write_data(&mut descriptor, peer);
},
}
} else {
for mut descriptor in peers.peers_needing_send.drain() {
match peers.peers.get_mut(&descriptor) {
- Some(peer) => Self::do_attempt_write_data(&mut descriptor, peer),
+ Some(peer) => self.do_attempt_write_data(&mut descriptor, peer),
None => panic!("Inconsistent peers set state!"),
}
}
}
fn create_network(peer_count: usize) -> Vec<PeerManager<FileDescriptor>> {
- let secp_ctx = Secp256k1::new();
let mut peers = Vec::new();
let mut rng = thread_rng();
let logger : Arc<Logger> = Arc::new(test_utils::TestLogger::new());
let node_id = {
let mut key_slice = [0;32];
rng.fill_bytes(&mut key_slice);
- SecretKey::from_slice(&secp_ctx, &key_slice).unwrap()
+ SecretKey::from_slice(&key_slice).unwrap()
};
let msg_handler = MessageHandler { chan_handler: Arc::new(chan_handler), route_handler: Arc::new(router) };
let peer = PeerManager::new(msg_handler, node_id, Arc::clone(&logger));
#[test]
fn test_disconnect_peer() {
// Simple test which builds a network of PeerManager, connects and brings them to NoiseState::Finished and
- // push an DisconnectPeer event to remove the node flagged by id
+ // push a DisconnectPeer event to remove the node flagged by id
let mut peers = create_network(2);
establish_connection(&peers[0], &peers[1]);
assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 1);