ICE demo 2020-02-rustc-ice
authorMatt Corallo <git@bluematt.me>
Mon, 3 Feb 2020 23:29:59 +0000 (18:29 -0500)
committerMatt Corallo <git@bluematt.me>
Mon, 3 Feb 2020 23:29:59 +0000 (18:29 -0500)
lightning-net-tokio/src/lib.rs

index 6d992873a16a2ee33eecdd24f757cd59ce1b65ca..6ee34e329f4d02d1c3ae824c5d53c0ca0f2087fc 100644 (file)
@@ -109,11 +109,13 @@ impl Connection {
                                                                        lock.read_paused = true;
                                                                }
 
-                                                               if let Err(mpsc::error::TrySendError::Full(_)) = us.lock().unwrap().event_notify.try_send(()) {
-                                                                       // Ignore full errors as we just need them to poll after this point, so if the user
-                                                                       // hasn't received the last send yet, it doesn't matter.
-                                                               } else {
-                                                                       panic!();
+                                                               match us.lock().unwrap().event_notify.try_send(()) {
+                                                                       Ok(_) => {},
+                                                                       Err(mpsc::error::TrySendError::Full(_)) => {
+                                                                               // Ignore full errors as we just need them to poll after this point, so if the user
+                                                                               // hasn't received the last send yet, it doesn't matter.
+                                                                       },
+                                                                       _ => panic!()
                                                                }
                                                        },
                                                        Err(e) => shutdown_socket!(e),
@@ -132,11 +134,13 @@ impl Connection {
                }
                if us.lock().unwrap().need_disconnect_event {
                        peer_manager_ref.disconnect_event(&SocketDescriptor::new(Arc::clone(&us)));
-                       if let Err(mpsc::error::TrySendError::Full(_)) = us.lock().unwrap().event_notify.try_send(()) {
-                               // Ignore full errors as we just need them to poll after this point, so if the user
-                               // hasn't received the last send yet, it doesn't matter.
-                       } else {
-                               panic!();
+                       match us.lock().unwrap().event_notify.try_send(()) {
+                               Ok(_) => {},
+                               Err(mpsc::error::TrySendError::Full(_)) => {
+                                       // Ignore full errors as we just need them to poll after this point, so if the user
+                                       // hasn't received the last send yet, it doesn't matter.
+                               },
+                               _ => panic!()
                        }
                }
        }
@@ -180,14 +184,15 @@ impl Connection {
        pub async fn setup_outbound<CMH: ChannelMessageHandler + 'static>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, stream: TcpStream) {
                let (reader, receiver, us) = Self::new(event_notify, stream);
 
+println!("Calling noc...");
                if let Ok(initial_send) = peer_manager.new_outbound_connection(their_node_id, SocketDescriptor::new(us.clone())) {
-                       if SocketDescriptor::new(us.clone()).send_data(&initial_send, true) == initial_send.len() {
-                               tokio::spawn(Self::schedule_read(peer_manager, us, reader, receiver)).await;
-                       } else {
+                       if let Err(_) = us.lock().unwrap().writer.as_mut().unwrap().write_all(&initial_send).await {
                                // Note that we will skip disconnect_event here, in accordance with the PeerManager
                                // requirements, as disconnect_event is called by the schedule_read Future.
                                println!("Failed to write first full message to socket!");
+                               return;
                        }
+                       tokio::spawn(Self::schedule_read(peer_manager, us, reader, receiver)).await;
                }
        }
 
@@ -227,7 +232,8 @@ fn wake_socket_waker_by_ref(orig_ptr: *const ()) {
        // fully write, but we only need to provide a write_event() once. Otherwise, the sending thread
        // may have already gone away due to a socket close, in which case there's nothing to wake up
        // anyway.
-       let _ = unsafe { (*descriptor).conn.lock() }.unwrap().write_avail.try_send(());
+println!("WAKE");
+       let _ = unsafe { (*descriptor).write_avail.clone() }.try_send(());
 }
 fn drop_socket_waker(orig_ptr: *const ()) {
        let _orig_box = unsafe { Box::from_raw(orig_ptr as *mut SocketDescriptor) };
@@ -241,18 +247,27 @@ fn descriptor_to_waker(descriptor: *const SocketDescriptor) -> task::RawWaker {
 
 pub struct SocketDescriptor {
        conn: Arc<Mutex<Connection>>,
+       // Ideally we'd just lock conn and push to the write_avail there, but sadly tokio calls our
+       // waker irrespective of available space ;
+       write_avail: mpsc::Sender<()>,
        id: u64,
 }
 impl SocketDescriptor {
        fn new(conn: Arc<Mutex<Connection>>) -> Self {
-               let id = conn.lock().unwrap().id;
-               Self { conn, id }
+               let (id, write_avail) = {
+                       let us = conn.lock().unwrap();
+                       (us.id, us.write_avail.clone())
+               };
+               Self { conn, write_avail, id }
        }
 }
 impl peer_handler::SocketDescriptor for SocketDescriptor {
        fn send_data(&mut self, data: &[u8], resume_read: bool) -> usize {
+println!("locking...");
                let mut us = self.conn.lock().unwrap();
+println!("done");
                if us.writer.is_none() {
+println!("NO WRITER");
                        // The writer gets take()n when its time to shut down, so just fast-return 0 here.
                        return 0;
                }
@@ -268,8 +283,10 @@ impl peer_handler::SocketDescriptor for SocketDescriptor {
                let mut ctx = task::Context::from_waker(&waker);
                let mut written_len = 0;
                loop {
+println!("calling poll_write...");
                        match std::pin::Pin::new(us.writer.as_mut().unwrap()).poll_write(&mut ctx, &data[written_len..]) {
                                task::Poll::Ready(Ok(res)) => {
+println!("WRITE {}", res);
                                        // The tokio docs *seem* to indicate this can't happen, and I certainly don't
                                        // know how to handle it if it does (cause it should be a Poll::Pending
                                        // instead):
@@ -278,6 +295,7 @@ impl peer_handler::SocketDescriptor for SocketDescriptor {
                                        if written_len == data.len() { return written_len; }
                                },
                                task::Poll::Ready(Err(e)) => {
+println!("DEAD SOCKET {:?}", e);
                                        // The tokio docs *seem* to indicate this can't happen, and I certainly don't
                                        // know how to handle it if it does (cause it should be a Poll::Pending
                                        // instead):
@@ -287,6 +305,7 @@ impl peer_handler::SocketDescriptor for SocketDescriptor {
                                        return written_len;
                                },
                                task::Poll::Pending => {
+println!("PENDING WRITE NOW");
                                        // We're queued up for a write event now, but we need to make sure we also
                                        // pause read given we're now waiting on the remote end to ACK (and in
                                        // accordance with the send_data() docs).
@@ -314,6 +333,7 @@ impl Clone for SocketDescriptor {
        fn clone(&self) -> Self {
                Self {
                        conn: Arc::clone(&self.conn),
+                       write_avail: self.write_avail.clone(),
                        id: self.id,
                }
        }