/// announcements/updates for the given channel_id then we will send it when we get to that
/// point and we shouldn't send it yet to avoid sending duplicate updates. If we've already
/// sent the old versions, we should send the update, and so return true here.
- fn should_forward_channel(&self, channel_id: u64)->bool{
+ fn should_forward_channel_announcement(&self, channel_id: u64)->bool{
match self.sync_status {
InitSyncTracker::NoSyncRequested => true,
InitSyncTracker::ChannelsSyncing(i) => i < channel_id,
InitSyncTracker::NodesSyncing(_) => true,
}
}
+
+ /// Similar to the above, but for node announcements indexed by node_id.
+ fn should_forward_node_announcement(&self, node_id: PublicKey) -> bool {
+ match self.sync_status {
+ InitSyncTracker::NoSyncRequested => true,
+ InitSyncTracker::ChannelsSyncing(_) => false,
+ InitSyncTracker::NodesSyncing(pk) => pk < node_id,
+ }
+ }
}
struct PeerHolder<Descriptor: SocketDescriptor> {
/// lifetimes). Other times you can afford a reference, which is more efficient, in which case
/// SimpleRefPeerManager is the more appropriate type. Defining these type aliases prevents
/// issues such as overly long function definitions.
-pub type SimpleArcPeerManager<SD, M, T> = Arc<PeerManager<SD, SimpleArcChannelManager<M, T>>>;
+pub type SimpleArcPeerManager<SD, M, T, F> = Arc<PeerManager<SD, SimpleArcChannelManager<M, T, F>>>;
/// SimpleRefPeerManager is a type alias for a PeerManager reference, and is the reference
/// counterpart to the SimpleArcPeerManager type alias. Use this type by default when you don't
/// usage of lightning-net-tokio (since tokio::spawn requires parameters with static lifetimes).
/// But if this is not necessary, using a reference is more efficient. Defining these type aliases
/// helps with issues such as long function definitions.
-pub type SimpleRefPeerManager<'a, 'b, 'c, SD, M, T> = PeerManager<SD, SimpleRefChannelManager<'a, 'b, 'c, M, T>>;
+pub type SimpleRefPeerManager<'a, 'b, 'c, 'd, SD, M, T, F> = PeerManager<SD, SimpleRefChannelManager<'a, 'b, 'c, 'd, M, T, F>>;
/// A PeerManager manages a set of peers, described by their SocketDescriptor and marshalls socket
/// events into messages which it passes on to its MessageHandlers.
/// on this file descriptor has resume_read set (preventing DoS issues in the send buffer).
///
/// Panics if the descriptor was not previously registered in a new_*_connection event.
- pub fn read_event(&self, peer_descriptor: &mut Descriptor, data: Vec<u8>) -> Result<bool, PeerHandleError> {
+ pub fn read_event(&self, peer_descriptor: &mut Descriptor, data: &[u8]) -> Result<bool, PeerHandleError> {
match self.do_read_event(peer_descriptor, data) {
Ok(res) => Ok(res),
Err(e) => {
}
}
- fn do_read_event(&self, peer_descriptor: &mut Descriptor, data: Vec<u8>) -> Result<bool, PeerHandleError> {
+ fn do_read_event(&self, peer_descriptor: &mut Descriptor, data: &[u8]) -> Result<bool, PeerHandleError> {
let pause_read = {
let mut peers_lock = self.peers.lock().unwrap();
let peers = &mut *peers_lock;
log_debug!(self, "Deserialization failed due to shortness of message");
return Err(PeerHandleError { no_connection_possible: false });
}
- msgs::DecodeError::ExtraAddressesPerType => {
- log_debug!(self, "Error decoding message, ignoring due to lnd spec incompatibility. See https://github.com/lightningnetwork/lnd/issues/1407");
- continue;
- }
msgs::DecodeError::BadLengthDescriptor => return Err(PeerHandleError { no_connection_possible: false }),
msgs::DecodeError::Io(_) => return Err(PeerHandleError { no_connection_possible: false }),
}
// Unknown messages:
wire::Message::Unknown(msg_type) if msg_type.is_even() => {
+ log_debug!(self, "Received unknown even message of type {}, disconnecting peer!", msg_type);
// Fail the channel if message is an even, unknown type as per BOLT #1.
return Err(PeerHandleError{ no_connection_possible: true });
},
- wire::Message::Unknown(_) => {},
+ wire::Message::Unknown(msg_type) => {
+ log_trace!(self, "Received unknown odd message of type {}, ignoring", msg_type);
+ },
}
}
}
for (ref descriptor, ref mut peer) in peers.peers.iter_mut() {
if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() ||
- !peer.should_forward_channel(msg.contents.short_channel_id) {
+ !peer.should_forward_channel_announcement(msg.contents.short_channel_id) {
continue
}
match peer.their_node_id {
}
}
},
+ MessageSendEvent::BroadcastNodeAnnouncement { ref msg } => {
+ log_trace!(self, "Handling BroadcastNodeAnnouncement event in peer_handler");
+ if self.message_handler.route_handler.handle_node_announcement(msg).is_ok() {
+ let encoded_msg = encode_msg!(msg);
+
+ for (ref descriptor, ref mut peer) in peers.peers.iter_mut() {
+ if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() ||
+ !peer.should_forward_node_announcement(msg.contents.node_id) {
+ continue
+ }
+ peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
+ self.do_attempt_write_data(&mut (*descriptor).clone(), peer);
+ }
+ }
+ },
MessageSendEvent::BroadcastChannelUpdate { ref msg } => {
log_trace!(self, "Handling BroadcastChannelUpdate event in peer_handler for short channel id {}", msg.contents.short_channel_id);
if self.message_handler.route_handler.handle_channel_update(msg).is_ok() {
for (ref descriptor, ref mut peer) in peers.peers.iter_mut() {
if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() ||
- !peer.should_forward_channel(msg.contents.short_channel_id) {
+ !peer.should_forward_channel_announcement(msg.contents.short_channel_id) {
continue
}
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
descriptors_needing_disconnect.push(descriptor.clone());
match peer.their_node_id {
Some(node_id) => {
+ log_trace!(self, "Disconnecting peer with id {} due to ping timeout", node_id);
node_id_to_descriptor.remove(&node_id);
- self.message_handler.chan_handler.peer_disconnected(&node_id, true);
+ self.message_handler.chan_handler.peer_disconnected(&node_id, false);
}
- None => {}
+ None => {
+ // This can't actually happen as we should have hit
+ // is_ready_for_encryption() previously on this same peer.
+ unreachable!();
+ },
}
return false;
}
let mut fd_b = FileDescriptor { fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())) };
let initial_data = peer_b.new_outbound_connection(a_id, fd_b.clone()).unwrap();
peer_a.new_inbound_connection(fd_a.clone()).unwrap();
- assert_eq!(peer_a.read_event(&mut fd_a, initial_data).unwrap(), false);
- assert_eq!(peer_b.read_event(&mut fd_b, fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
- assert_eq!(peer_a.read_event(&mut fd_a, fd_b.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
+ assert_eq!(peer_a.read_event(&mut fd_a, &initial_data).unwrap(), false);
+ assert_eq!(peer_b.read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
+ assert_eq!(peer_a.read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
}
#[test]