use std::sync::atomic::{AtomicUsize, Ordering};
use std::{cmp,error,hash,fmt};
+use bitcoin_hashes::sha256::Hash as Sha256;
+use bitcoin_hashes::sha256::HashEngine as Sha256Engine;
+use bitcoin_hashes::{HashEngine, Hash};
+
/// Provides references to trait impls which handle different types of messages.
pub struct MessageHandler {
/// A message handler which handles messages specific to channels. Usually this is just a
/// careful to ensure you don't have races whereby you might register a new connection with an fd
/// the same as a yet-to-be-disconnect_event()-ed.
pub trait SocketDescriptor : cmp::Eq + hash::Hash + Clone {
- /// Attempts to send some data from the given Vec starting at the given offset to the peer.
+ /// Attempts to send some data from the given slice to the peer.
+ ///
/// Returns the amount of data which was sent, possibly 0 if the socket has since disconnected.
/// Note that in the disconnected case, a disconnect_event must still fire and further write
/// attempts may occur until that time.
///
- /// If the returned size is smaller than data.len() - write_offset, a write_available event must
+ /// If the returned size is smaller than data.len(), a write_available event must
/// trigger the next time more data can be written. Additionally, until the a send_data event
/// completes fully, no further read_events should trigger on the same peer!
///
/// events should be paused to prevent DoS in the send buffer), resume_read may be set
/// indicating that read events on this descriptor should resume. A resume_read of false does
/// *not* imply that further read events should be paused.
- fn send_data(&mut self, data: &Vec<u8>, write_offset: usize, resume_read: bool) -> usize;
+ fn send_data(&mut self, data: &[u8], resume_read: bool) -> usize;
/// Disconnect the socket pointed to by this SocketDescriptor. Once this function returns, no
/// more calls to write_event, read_event or disconnect_event may be made with this descriptor.
/// No disconnect_event should be generated as a result of this call, though obviously races
}
}
+#[cfg(not(any(target_pointer_width = "32", target_pointer_width = "64")))]
+fn _check_usize_is_32_or_64() {
+ // See below, less than 32 bit pointers may be unsafe here!
+ unsafe { mem::transmute::<*const usize, [u8; 4]>(panic!()); }
+}
+
/// A PeerManager manages a set of peers, described by their SocketDescriptor and marshalls socket
/// events into messages which it passes on to its MessageHandlers.
pub struct PeerManager<Descriptor: SocketDescriptor> {
message_handler: MessageHandler,
peers: Mutex<PeerHolder<Descriptor>>,
our_node_secret: SecretKey,
+ ephemeral_key_midstate: Sha256Engine,
+
+ // Usize needs to be at least 32 bits to avoid overflowing both low and high. If usize is 64
+ // bits we will never realistically count into high:
+ peer_counter_low: AtomicUsize,
+ peer_counter_high: AtomicUsize,
+
initial_syncs_sent: AtomicUsize,
logger: Arc<Logger>,
}
/// PeerIds may repeat, but only after disconnect_event() has been called.
impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
/// Constructs a new PeerManager with the given message handlers and node_id secret key
- pub fn new(message_handler: MessageHandler, our_node_secret: SecretKey, logger: Arc<Logger>) -> PeerManager<Descriptor> {
+ /// ephemeral_random_data is used to derive per-connection ephemeral keys and must be
+ /// cryptographically secure random bytes.
+ pub fn new(message_handler: MessageHandler, our_node_secret: SecretKey, ephemeral_random_data: &[u8; 32], logger: Arc<Logger>) -> PeerManager<Descriptor> {
+ let mut ephemeral_key_midstate = Sha256::engine();
+ ephemeral_key_midstate.input(ephemeral_random_data);
+
PeerManager {
message_handler: message_handler,
peers: Mutex::new(PeerHolder {
node_id_to_descriptor: HashMap::new()
}),
our_node_secret: our_node_secret,
+ ephemeral_key_midstate,
+ peer_counter_low: AtomicUsize::new(0),
+ peer_counter_high: AtomicUsize::new(0),
initial_syncs_sent: AtomicUsize::new(0),
logger,
}
}).collect()
}
+ fn get_ephemeral_key(&self) -> SecretKey {
+ let mut ephemeral_hash = self.ephemeral_key_midstate.clone();
+ let low = self.peer_counter_low.fetch_add(1, Ordering::AcqRel);
+ let high = if low == 0 {
+ self.peer_counter_high.fetch_add(1, Ordering::AcqRel)
+ } else {
+ self.peer_counter_high.load(Ordering::Acquire)
+ };
+ ephemeral_hash.input(&byte_utils::le64_to_array(low as u64));
+ ephemeral_hash.input(&byte_utils::le64_to_array(high as u64));
+ SecretKey::from_slice(&Sha256::from_engine(ephemeral_hash).into_inner()).expect("You broke SHA-256!")
+ }
+
/// Indicates a new outbound connection has been established to a node with the given node_id.
/// Note that if an Err is returned here you MUST NOT call disconnect_event for the new
/// descriptor but must disconnect the connection immediately.
/// Panics if descriptor is duplicative with some other descriptor which has not yet has a
/// disconnect_event.
pub fn new_outbound_connection(&self, their_node_id: PublicKey, descriptor: Descriptor) -> Result<Vec<u8>, PeerHandleError> {
- let mut peer_encryptor = PeerChannelEncryptor::new_outbound(their_node_id.clone());
+ let mut peer_encryptor = PeerChannelEncryptor::new_outbound(their_node_id.clone(), self.get_ephemeral_key());
let res = peer_encryptor.get_act_one().to_vec();
let pending_read_buffer = [0; 50].to_vec(); // Noise act two is 50 bytes
};
let should_be_reading = peer.pending_outbound_buffer.len() < MSG_BUFF_SIZE;
- let data_sent = descriptor.send_data(next_buff, peer.pending_outbound_buffer_first_msg_offset, should_be_reading);
+ let pending = &next_buff[peer.pending_outbound_buffer_first_msg_offset..];
+ let data_sent = descriptor.send_data(pending, should_be_reading);
peer.pending_outbound_buffer_first_msg_offset += data_sent;
if peer.pending_outbound_buffer_first_msg_offset == next_buff.len() { true } else { false }
} {
match $thing {
Ok(x) => x,
Err(e) => {
- if let Some(action) = e.action {
- match action {
- msgs::ErrorAction::DisconnectPeer { msg: _ } => {
- //TODO: Try to push msg
- log_trace!(self, "Got Err handling message, disconnecting peer because {}", e.err);
- return Err(PeerHandleError{ no_connection_possible: false });
- },
- msgs::ErrorAction::IgnoreError => {
- log_trace!(self, "Got Err handling message, ignoring because {}", e.err);
- continue;
- },
- msgs::ErrorAction::SendErrorMessage { msg } => {
- log_trace!(self, "Got Err handling message, sending Error message because {}", e.err);
- encode_and_send_msg!(msg, 17);
- continue;
- },
- }
- } else {
- log_debug!(self, "Got Err handling message, action not yet filled in: {}", e.err);
- return Err(PeerHandleError{ no_connection_possible: false });
+ match e.action {
+ msgs::ErrorAction::DisconnectPeer { msg: _ } => {
+ //TODO: Try to push msg
+ log_trace!(self, "Got Err handling message, disconnecting peer because {}", e.err);
+ return Err(PeerHandleError{ no_connection_possible: false });
+ },
+ msgs::ErrorAction::IgnoreError => {
+ log_trace!(self, "Got Err handling message, ignoring because {}", e.err);
+ continue;
+ },
+ msgs::ErrorAction::SendErrorMessage { msg } => {
+ log_trace!(self, "Got Err handling message, sending Error message because {}", e.err);
+ encode_and_send_msg!(msg, 17);
+ continue;
+ },
}
}
};
let next_step = peer.channel_encryptor.get_noise_step();
match next_step {
NextNoiseStep::ActOne => {
- let act_two = try_potential_handleerror!(peer.channel_encryptor.process_act_one_with_key(&peer.pending_read_buffer[..], &self.our_node_secret)).to_vec();
+ let act_two = try_potential_handleerror!(peer.channel_encryptor.process_act_one_with_keys(&peer.pending_read_buffer[..], &self.our_node_secret, self.get_ephemeral_key())).to_vec();
peer.pending_outbound_buffer.push_back(act_two);
peer.pending_read_buffer = [0; 66].to_vec(); // act three is 66 bytes long
},
log_info!(self, "Peer local features required unknown version bits");
return Err(PeerHandleError{ no_connection_possible: true });
}
- if msg.local_features.requires_data_loss_protect() {
- log_info!(self, "Peer local features required data_loss_protect");
- return Err(PeerHandleError{ no_connection_possible: true });
- }
if peer.their_global_features.is_some() {
return Err(PeerHandleError{ no_connection_possible: false });
}
self.message_handler.route_handler.handle_htlc_fail_channel_update(update);
},
MessageSendEvent::HandleError { ref node_id, ref action } => {
- if let Some(ref action) = *action {
- match *action {
- msgs::ErrorAction::DisconnectPeer { ref msg } => {
- if let Some(mut descriptor) = peers.node_id_to_descriptor.remove(node_id) {
- peers.peers_needing_send.remove(&descriptor);
- if let Some(mut peer) = peers.peers.remove(&descriptor) {
- if let Some(ref msg) = *msg {
- log_trace!(self, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}",
- log_pubkey!(node_id),
- msg.data);
- peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 17)));
- // This isn't guaranteed to work, but if there is enough free
- // room in the send buffer, put the error message there...
- self.do_attempt_write_data(&mut descriptor, &mut peer);
- } else {
- log_trace!(self, "Handling DisconnectPeer HandleError event in peer_handler for node {} with no message", log_pubkey!(node_id));
- }
+ match *action {
+ msgs::ErrorAction::DisconnectPeer { ref msg } => {
+ if let Some(mut descriptor) = peers.node_id_to_descriptor.remove(node_id) {
+ peers.peers_needing_send.remove(&descriptor);
+ if let Some(mut peer) = peers.peers.remove(&descriptor) {
+ if let Some(ref msg) = *msg {
+ log_trace!(self, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}",
+ log_pubkey!(node_id),
+ msg.data);
+ peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 17)));
+ // This isn't guaranteed to work, but if there is enough free
+ // room in the send buffer, put the error message there...
+ self.do_attempt_write_data(&mut descriptor, &mut peer);
+ } else {
+ log_trace!(self, "Handling DisconnectPeer HandleError event in peer_handler for node {} with no message", log_pubkey!(node_id));
}
- descriptor.disconnect_socket();
- self.message_handler.chan_handler.peer_disconnected(&node_id, false);
}
- },
- msgs::ErrorAction::IgnoreError => {},
- msgs::ErrorAction::SendErrorMessage { ref msg } => {
- log_trace!(self, "Handling SendErrorMessage HandleError event in peer_handler for node {} with message {}",
- log_pubkey!(node_id),
- msg.data);
- let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
- //TODO: Do whatever we're gonna do for handling dropped messages
- });
- peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 17)));
- self.do_attempt_write_data(&mut descriptor, peer);
- },
- }
- } else {
- log_error!(self, "Got no-action HandleError Event in peer_handler for node {}, no such events should ever be generated!", log_pubkey!(node_id));
+ descriptor.disconnect_socket();
+ self.message_handler.chan_handler.peer_disconnected(&node_id, false);
+ }
+ },
+ msgs::ErrorAction::IgnoreError => {},
+ msgs::ErrorAction::SendErrorMessage { ref msg } => {
+ log_trace!(self, "Handling SendErrorMessage HandleError event in peer_handler for node {} with message {}",
+ log_pubkey!(node_id),
+ msg.data);
+ let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
+ //TODO: Do whatever we're gonna do for handling dropped messages
+ });
+ peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 17)));
+ self.do_attempt_write_data(&mut descriptor, peer);
+ },
}
}
}
}
impl SocketDescriptor for FileDescriptor {
- fn send_data(&mut self, data: &Vec<u8>, write_offset: usize, _resume_read: bool) -> usize {
- assert!(write_offset < data.len());
- data.len() - write_offset
+ fn send_data(&mut self, data: &[u8], _resume_read: bool) -> usize {
+ data.len()
}
fn disconnect_socket(&mut self) {}
let mut peers = Vec::new();
let mut rng = thread_rng();
let logger : Arc<Logger> = Arc::new(test_utils::TestLogger::new());
+ let mut ephemeral_bytes = [0; 32];
+ rng.fill_bytes(&mut ephemeral_bytes);
for _ in 0..peer_count {
let chan_handler = test_utils::TestChannelMessageHandler::new();
SecretKey::from_slice(&key_slice).unwrap()
};
let msg_handler = MessageHandler { chan_handler: Arc::new(chan_handler), route_handler: Arc::new(router) };
- let peer = PeerManager::new(msg_handler, node_id, Arc::clone(&logger));
+ let peer = PeerManager::new(msg_handler, node_id, &ephemeral_bytes, Arc::clone(&logger));
peers.push(peer);
}
let chan_handler = test_utils::TestChannelMessageHandler::new();
chan_handler.pending_events.lock().unwrap().push(events::MessageSendEvent::HandleError {
node_id: their_id,
- action: Some(msgs::ErrorAction::DisconnectPeer { msg: None }),
+ action: msgs::ErrorAction::DisconnectPeer { msg: None },
});
assert_eq!(chan_handler.pending_events.lock().unwrap().len(), 1);
peers[0].message_handler.chan_handler = Arc::new(chan_handler);