From dc77eb35b84f54b6cdb4a65acb9584889eae714f Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 3 Feb 2020 18:29:59 -0500 Subject: [PATCH] ICE demo --- lightning-net-tokio/src/lib.rs | 52 +++++++++++++++++++++++----------- 1 file changed, 36 insertions(+), 16 deletions(-) diff --git a/lightning-net-tokio/src/lib.rs b/lightning-net-tokio/src/lib.rs index 6d992873a..6ee34e329 100644 --- a/lightning-net-tokio/src/lib.rs +++ b/lightning-net-tokio/src/lib.rs @@ -109,11 +109,13 @@ impl Connection { lock.read_paused = true; } - if let Err(mpsc::error::TrySendError::Full(_)) = us.lock().unwrap().event_notify.try_send(()) { - // Ignore full errors as we just need them to poll after this point, so if the user - // hasn't received the last send yet, it doesn't matter. - } else { - panic!(); + match us.lock().unwrap().event_notify.try_send(()) { + Ok(_) => {}, + Err(mpsc::error::TrySendError::Full(_)) => { + // Ignore full errors as we just need them to poll after this point, so if the user + // hasn't received the last send yet, it doesn't matter. + }, + _ => panic!() } }, Err(e) => shutdown_socket!(e), @@ -132,11 +134,13 @@ impl Connection { } if us.lock().unwrap().need_disconnect_event { peer_manager_ref.disconnect_event(&SocketDescriptor::new(Arc::clone(&us))); - if let Err(mpsc::error::TrySendError::Full(_)) = us.lock().unwrap().event_notify.try_send(()) { - // Ignore full errors as we just need them to poll after this point, so if the user - // hasn't received the last send yet, it doesn't matter. - } else { - panic!(); + match us.lock().unwrap().event_notify.try_send(()) { + Ok(_) => {}, + Err(mpsc::error::TrySendError::Full(_)) => { + // Ignore full errors as we just need them to poll after this point, so if the user + // hasn't received the last send yet, it doesn't matter. + }, + _ => panic!() } } } @@ -180,14 +184,15 @@ impl Connection { pub async fn setup_outbound(peer_manager: Arc>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, stream: TcpStream) { let (reader, receiver, us) = Self::new(event_notify, stream); +println!("Calling noc..."); if let Ok(initial_send) = peer_manager.new_outbound_connection(their_node_id, SocketDescriptor::new(us.clone())) { - if SocketDescriptor::new(us.clone()).send_data(&initial_send, true) == initial_send.len() { - tokio::spawn(Self::schedule_read(peer_manager, us, reader, receiver)).await; - } else { + if let Err(_) = us.lock().unwrap().writer.as_mut().unwrap().write_all(&initial_send).await { // Note that we will skip disconnect_event here, in accordance with the PeerManager // requirements, as disconnect_event is called by the schedule_read Future. println!("Failed to write first full message to socket!"); + return; } + tokio::spawn(Self::schedule_read(peer_manager, us, reader, receiver)).await; } } @@ -227,7 +232,8 @@ fn wake_socket_waker_by_ref(orig_ptr: *const ()) { // fully write, but we only need to provide a write_event() once. Otherwise, the sending thread // may have already gone away due to a socket close, in which case there's nothing to wake up // anyway. - let _ = unsafe { (*descriptor).conn.lock() }.unwrap().write_avail.try_send(()); +println!("WAKE"); + let _ = unsafe { (*descriptor).write_avail.clone() }.try_send(()); } fn drop_socket_waker(orig_ptr: *const ()) { let _orig_box = unsafe { Box::from_raw(orig_ptr as *mut SocketDescriptor) }; @@ -241,18 +247,27 @@ fn descriptor_to_waker(descriptor: *const SocketDescriptor) -> task::RawWaker { pub struct SocketDescriptor { conn: Arc>, + // Ideally we'd just lock conn and push to the write_avail there, but sadly tokio calls our + // waker irrespective of available space ; + write_avail: mpsc::Sender<()>, id: u64, } impl SocketDescriptor { fn new(conn: Arc>) -> Self { - let id = conn.lock().unwrap().id; - Self { conn, id } + let (id, write_avail) = { + let us = conn.lock().unwrap(); + (us.id, us.write_avail.clone()) + }; + Self { conn, write_avail, id } } } impl peer_handler::SocketDescriptor for SocketDescriptor { fn send_data(&mut self, data: &[u8], resume_read: bool) -> usize { +println!("locking..."); let mut us = self.conn.lock().unwrap(); +println!("done"); if us.writer.is_none() { +println!("NO WRITER"); // The writer gets take()n when its time to shut down, so just fast-return 0 here. return 0; } @@ -268,8 +283,10 @@ impl peer_handler::SocketDescriptor for SocketDescriptor { let mut ctx = task::Context::from_waker(&waker); let mut written_len = 0; loop { +println!("calling poll_write..."); match std::pin::Pin::new(us.writer.as_mut().unwrap()).poll_write(&mut ctx, &data[written_len..]) { task::Poll::Ready(Ok(res)) => { +println!("WRITE {}", res); // The tokio docs *seem* to indicate this can't happen, and I certainly don't // know how to handle it if it does (cause it should be a Poll::Pending // instead): @@ -278,6 +295,7 @@ impl peer_handler::SocketDescriptor for SocketDescriptor { if written_len == data.len() { return written_len; } }, task::Poll::Ready(Err(e)) => { +println!("DEAD SOCKET {:?}", e); // The tokio docs *seem* to indicate this can't happen, and I certainly don't // know how to handle it if it does (cause it should be a Poll::Pending // instead): @@ -287,6 +305,7 @@ impl peer_handler::SocketDescriptor for SocketDescriptor { return written_len; }, task::Poll::Pending => { +println!("PENDING WRITE NOW"); // We're queued up for a write event now, but we need to make sure we also // pause read given we're now waiting on the remote end to ACK (and in // accordance with the send_data() docs). @@ -314,6 +333,7 @@ impl Clone for SocketDescriptor { fn clone(&self) -> Self { Self { conn: Arc::clone(&self.conn), + write_avail: self.write_avail.clone(), id: self.id, } } -- 2.39.5