Fix long-standing race in net-tokio reading after a disconnect event
authorMatt Corallo <git@bluematt.me>
Mon, 24 Feb 2020 19:17:04 +0000 (14:17 -0500)
committerMatt Corallo <git@bluematt.me>
Sun, 1 Mar 2020 04:26:15 +0000 (23:26 -0500)
If rust-lightning tells us to disconnect a socket after we read
some bytes from the socket, but before we actually give those bytes
to rust-lightning, we may end up calling rust-lightning with a
Descriptor that isn't registered anymore.

Sadly, there really isn't a good way to solve this, and it should
be a pretty quick event, so we just busy-wait.

lightning-net-tokio/src/lib.rs

index 5a2860b005af93c92798ae3b866c6c65733a3453..b04e93f74496f33d801bb9261449929b2c5b1308 100644 (file)
@@ -70,7 +70,7 @@ use lightning::ln::peer_handler;
 use lightning::ln::peer_handler::SocketDescriptor as LnSocketTrait;
 use lightning::ln::msgs::ChannelMessageHandler;
 
-use std::task;
+use std::{task, thread};
 use std::net::SocketAddr;
 use std::sync::{Arc, Mutex};
 use std::sync::atomic::{AtomicU64, Ordering};
@@ -111,6 +111,11 @@ struct Connection {
        // socket. To wake it up (without otherwise changing its state, we can push a value into this
        // Sender.
        read_waker: mpsc::Sender<()>,
+       // When we are told by rust-lightning to disconnect, we can't return to rust-lightning until we
+       // are sure we won't call any more read/write PeerManager functions with the same connection.
+       // This is set to true if we're in such a condition (with disconnect checked before with the
+       // top-level mutex held) and false when we can return.
+       block_disconnect_socket: bool,
        read_paused: bool,
        disconnect_state: DisconnectionState,
        id: u64,
@@ -128,17 +133,26 @@ impl Connection {
                                } }
                        }
 
+                       macro_rules! prepare_read_write_call {
+                               () => { {
+                                       let mut us_lock = us.lock().unwrap();
+                                       if us_lock.disconnect_state == DisconnectionState::RLTriggeredDisconnect {
+                                               shutdown_socket!("disconnect_socket() call from RL");
+                                       }
+                                       us_lock.block_disconnect_socket = true;
+                               } }
+                       }
+
                        let read_paused = us.lock().unwrap().read_paused;
                        tokio::select! {
                                v = write_avail_receiver.recv() => {
                                        assert!(v.is_some()); // We can't have dropped the sending end, its in the us Arc!
-                                       if us.lock().unwrap().disconnect_state == DisconnectionState::RLTriggeredDisconnect {
-                                               shutdown_socket!("disconnect_socket() call from RL");
-                                       }
+                                       prepare_read_write_call!();
                                        if let Err(e) = peer_manager.write_buffer_space_avail(&mut SocketDescriptor::new(us.clone())) {
                                                us.lock().unwrap().disconnect_state = DisconnectionState::RLTriggeredDisconnect;
                                                shutdown_socket!(e);
                                        }
+                                       us.lock().unwrap().block_disconnect_socket = false;
                                },
                                _ = read_wake_receiver.recv() => {},
                                read = reader.read(&mut buf), if !read_paused => match read {
@@ -147,9 +161,7 @@ impl Connection {
                                                break;
                                        },
                                        Ok(len) => {
-                                               if us.lock().unwrap().disconnect_state == DisconnectionState::RLTriggeredDisconnect {
-                                                       shutdown_socket!("disconnect_socket() call from RL");
-                                               }
+                                               prepare_read_write_call!();
                                                match peer_manager.read_event(&mut SocketDescriptor::new(Arc::clone(&us)), &buf[0..len]) {
                                                        Ok(pause_read) => {
                                                                if pause_read {
@@ -171,6 +183,7 @@ impl Connection {
                                                                shutdown_socket!(e)
                                                        },
                                                }
+                                               us.lock().unwrap().block_disconnect_socket = false;
                                        },
                                        Err(e) => {
                                                println!("Connection closed: {}", e);
@@ -179,6 +192,7 @@ impl Connection {
                                },
                        }
                }
+               us.lock().unwrap().block_disconnect_socket = false;
                let writer_option = us.lock().unwrap().writer.take();
                if let Some(mut writer) = writer_option {
                        // If the socket is already closed, shutdown() will fail, so just ignore it.
@@ -212,8 +226,8 @@ impl Connection {
 
                (reader, write_receiver, read_receiver,
                Arc::new(Mutex::new(Self {
-                       writer: Some(writer), event_notify, write_avail, read_waker,
-                       read_paused: false, disconnect_state: DisconnectionState::NeedDisconnectEvent,
+                       writer: Some(writer), event_notify, write_avail, read_waker, read_paused: false,
+                       block_disconnect_socket: false, disconnect_state: DisconnectionState::NeedDisconnectEvent,
                        id: ID_COUNTER.fetch_add(1, Ordering::AcqRel)
                })))
        }
@@ -400,15 +414,18 @@ impl peer_handler::SocketDescriptor for SocketDescriptor {
        }
 
        fn disconnect_socket(&mut self) {
-               let mut us = self.conn.lock().unwrap();
-               us.disconnect_state = DisconnectionState::RLTriggeredDisconnect;
-               us.read_paused = true;
-               // Wake up the sending thread, assuming it is still alive
-               let _ = us.write_avail.try_send(());
-               // TODO: There's a race where we don't meet the requirements of disconnect_socket if the
-               // read task is about to call a PeerManager function (eg read_event or write_event).
-               // Ideally we need to release the us lock and block until we have confirmation from the
-               // read task that it has broken out of its main loop.
+               {
+                       let mut us = self.conn.lock().unwrap();
+                       us.disconnect_state = DisconnectionState::RLTriggeredDisconnect;
+                       us.read_paused = true;
+                       // Wake up the sending thread, assuming it is still alive
+                       let _ = us.write_avail.try_send(());
+                       // Happy-path return:
+                       if !us.block_disconnect_socket { return; }
+               }
+               while self.conn.lock().unwrap().block_disconnect_socket {
+                       thread::yield_now();
+               }
        }
 }
 impl Clone for SocketDescriptor {