lock.read_paused = true;
}
- if let Err(mpsc::error::TrySendError::Full(_)) = us.lock().unwrap().event_notify.try_send(()) {
- // Ignore full errors as we just need them to poll after this point, so if the user
- // hasn't received the last send yet, it doesn't matter.
- } else {
- panic!();
+ match us.lock().unwrap().event_notify.try_send(()) {
+ Ok(_) => {},
+ Err(mpsc::error::TrySendError::Full(_)) => {
+ // Ignore full errors as we just need them to poll after this point, so if the user
+ // hasn't received the last send yet, it doesn't matter.
+ },
+ _ => panic!()
}
},
Err(e) => shutdown_socket!(e),
}
if us.lock().unwrap().need_disconnect_event {
peer_manager_ref.disconnect_event(&SocketDescriptor::new(Arc::clone(&us)));
- if let Err(mpsc::error::TrySendError::Full(_)) = us.lock().unwrap().event_notify.try_send(()) {
- // Ignore full errors as we just need them to poll after this point, so if the user
- // hasn't received the last send yet, it doesn't matter.
- } else {
- panic!();
+ match us.lock().unwrap().event_notify.try_send(()) {
+ Ok(_) => {},
+ Err(mpsc::error::TrySendError::Full(_)) => {
+ // Ignore full errors as we just need them to poll after this point, so if the user
+ // hasn't received the last send yet, it doesn't matter.
+ },
+ _ => panic!()
}
}
}
pub async fn setup_outbound<CMH: ChannelMessageHandler + 'static>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, stream: TcpStream) {
let (reader, receiver, us) = Self::new(event_notify, stream);
+println!("Calling noc...");
if let Ok(initial_send) = peer_manager.new_outbound_connection(their_node_id, SocketDescriptor::new(us.clone())) {
- if SocketDescriptor::new(us.clone()).send_data(&initial_send, true) == initial_send.len() {
- tokio::spawn(Self::schedule_read(peer_manager, us, reader, receiver)).await;
- } else {
+ if let Err(_) = us.lock().unwrap().writer.as_mut().unwrap().write_all(&initial_send).await {
// Note that we will skip disconnect_event here, in accordance with the PeerManager
// requirements, as disconnect_event is called by the schedule_read Future.
println!("Failed to write first full message to socket!");
+ return;
}
+ tokio::spawn(Self::schedule_read(peer_manager, us, reader, receiver)).await;
}
}
// fully write, but we only need to provide a write_event() once. Otherwise, the sending thread
// may have already gone away due to a socket close, in which case there's nothing to wake up
// anyway.
- let _ = unsafe { (*descriptor).conn.lock() }.unwrap().write_avail.try_send(());
+println!("WAKE");
+ let _ = unsafe { (*descriptor).write_avail.clone() }.try_send(());
}
fn drop_socket_waker(orig_ptr: *const ()) {
let _orig_box = unsafe { Box::from_raw(orig_ptr as *mut SocketDescriptor) };
pub struct SocketDescriptor {
conn: Arc<Mutex<Connection>>,
+ // Ideally we'd just lock conn and push to the write_avail there, but sadly tokio calls our
+ // waker irrespective of available space ;
+ write_avail: mpsc::Sender<()>,
id: u64,
}
impl SocketDescriptor {
fn new(conn: Arc<Mutex<Connection>>) -> Self {
- let id = conn.lock().unwrap().id;
- Self { conn, id }
+ let (id, write_avail) = {
+ let us = conn.lock().unwrap();
+ (us.id, us.write_avail.clone())
+ };
+ Self { conn, write_avail, id }
}
}
impl peer_handler::SocketDescriptor for SocketDescriptor {
fn send_data(&mut self, data: &[u8], resume_read: bool) -> usize {
+println!("locking...");
let mut us = self.conn.lock().unwrap();
+println!("done");
if us.writer.is_none() {
+println!("NO WRITER");
// The writer gets take()n when its time to shut down, so just fast-return 0 here.
return 0;
}
let mut ctx = task::Context::from_waker(&waker);
let mut written_len = 0;
loop {
+println!("calling poll_write...");
match std::pin::Pin::new(us.writer.as_mut().unwrap()).poll_write(&mut ctx, &data[written_len..]) {
task::Poll::Ready(Ok(res)) => {
+println!("WRITE {}", res);
// The tokio docs *seem* to indicate this can't happen, and I certainly don't
// know how to handle it if it does (cause it should be a Poll::Pending
// instead):
if written_len == data.len() { return written_len; }
},
task::Poll::Ready(Err(e)) => {
+println!("DEAD SOCKET {:?}", e);
// The tokio docs *seem* to indicate this can't happen, and I certainly don't
// know how to handle it if it does (cause it should be a Poll::Pending
// instead):
return written_len;
},
task::Poll::Pending => {
+println!("PENDING WRITE NOW");
// We're queued up for a write event now, but we need to make sure we also
// pause read given we're now waiting on the remote end to ACK (and in
// accordance with the send_data() docs).
fn clone(&self) -> Self {
Self {
conn: Arc::clone(&self.conn),
+ write_avail: self.write_avail.clone(),
id: self.id,
}
}