+ tokio::select! {
+ v = write_event.recv() => select_write_ev!(v),
+ read = reader.read(&mut buf) => match read {
+ Ok(0) => {
+ println!("Connection closed");
+ break;
+ },
+ Ok(len) => {
+ if let Some(blocker) = {
+ let mut lock = us.lock().unwrap();
+ if lock.disconnect {
+ shutdown_socket!("disconnect_socket() call from RL");
+ }
+ if lock.read_paused {
+ let (sender, blocker) = oneshot::channel();
+ lock.read_blocker = Some(sender);
+ Some(blocker)
+ } else { None }
+ } {
+ tokio::select! {
+ res = blocker => {
+ res.unwrap(); // We should never drop the sender without sending () into it!
+ if us.lock().unwrap().disconnect {
+ shutdown_socket!("disconnect_socket() call from RL");
+ }
+ },
+ v = write_event.recv() => select_write_ev!(v),
+ }
+ }
+ match peer_manager.read_event(&mut SocketDescriptor::new(Arc::clone(&us)), &buf[0..len]) {
+ Ok(pause_read) => {
+ if pause_read {
+ let mut lock = us.lock().unwrap();
+ lock.read_paused = true;
+ }
+
+ if let Err(mpsc::error::TrySendError::Full(_)) = us.lock().unwrap().event_notify.try_send(()) {
+ // Ignore full errors as we just need them to poll after this point, so if the user
+ // hasn't received the last send yet, it doesn't matter.
+ } else {
+ panic!();
+ }
+ },
+ Err(e) => shutdown_socket!(e),
+ }
+ },
+ Err(e) => {
+ println!("Connection closed: {}", e);
+ break;
+ },
+ },
+ }
+ }
+ let writer_option = us.lock().unwrap().writer.take();
+ if let Some(mut writer) = writer_option {
+ writer.shutdown().await.expect("We should be able to shutdown() a socket, even if it is already disconnected");
+ }
+ if us.lock().unwrap().need_disconnect_event {
+ peer_manager_ref.disconnect_event(&SocketDescriptor::new(Arc::clone(&us)));
+ if let Err(mpsc::error::TrySendError::Full(_)) = us.lock().unwrap().event_notify.try_send(()) {