From: Matt Corallo Date: Mon, 3 May 2021 01:04:49 +0000 (+0000) Subject: Slightly reduce race conditions in NioPeerHandler X-Git-Tag: v0.0.98~1^2~12 X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=commitdiff_plain;h=f256fb83cedebee1ce61c7a7123bf1fe4232c641;p=ldk-java Slightly reduce race conditions in NioPeerHandler Sadly there's some restructuring that needs to happen to PeerHandler before all possible race conditions go away. --- diff --git a/src/main/java/org/ldk/batteries/NioPeerHandler.java b/src/main/java/org/ldk/batteries/NioPeerHandler.java index f93d1652..f0a38282 100644 --- a/src/main/java/org/ldk/batteries/NioPeerHandler.java +++ b/src/main/java/org/ldk/batteries/NioPeerHandler.java @@ -1,17 +1,12 @@ package org.ldk.batteries; -import org.ldk.impl.bindings; import org.ldk.structs.*; import java.io.IOException; import java.net.SocketAddress; import java.net.StandardSocketOptions; import java.nio.ByteBuffer; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.ServerSocketChannel; -import java.nio.channels.SocketChannel; -import java.util.concurrent.Callable; +import java.nio.channels.*; /** * A NioPeerHandler maps LDK's PeerHandler to Java's NIO I/O interface. It spawns a single background thread which @@ -25,6 +20,8 @@ public class NioPeerHandler { // This is set to true if we're in such a condition (with disconnect checked // before with the Peer monitor lock held) and false when we can return. boolean block_disconnect_socket = false; + // Indicates LDK told us to disconnect this peer, and thus we should not call socket_disconnected. + boolean disconnect_requested = false; SelectionKey key; } @@ -85,6 +82,9 @@ public class NioPeerHandler { @Override public void disconnect_socket() { + synchronized (peer) { + peer.disconnect_requested = true; + } try { do_selector_action(() -> { peer.key.cancel(); @@ -146,67 +146,75 @@ public class NioPeerHandler { if (shutdown) return; if (Thread.interrupted()) return; for (SelectionKey key : this.selector.selectedKeys()) { - if ((key.interestOps() & SelectionKey.OP_ACCEPT) != 0) { - if (key.isAcceptable()) { - SocketChannel chan; - try { - chan = ((ServerSocketChannel) key.channel()).accept(); - } catch (IOException ignored) { - key.cancel(); - continue; - } - if (chan == null) continue; - try { - Peer peer = setup_socket(chan); - Result_NonePeerHandleErrorZ res = this.peer_manager.new_inbound_connection(peer.descriptor); - if (res instanceof Result_NonePeerHandleErrorZ.Result_NonePeerHandleErrorZ_OK) { - peer.key = chan.register(this.selector, SelectionKey.OP_READ, peer); - } - } catch (IOException ignored) { } - } - continue; // There is no attachment so the rest of the loop is useless - } - Peer peer = (Peer) key.attachment(); - synchronized (peer) { - peer.block_disconnect_socket = true; - } try { - if (key.isValid() && (key.interestOps() & SelectionKey.OP_WRITE) != 0 && key.isWritable()) { - Result_NonePeerHandleErrorZ res = this.peer_manager.write_buffer_space_avail(peer.descriptor); - if (res instanceof Result_NonePeerHandleErrorZ.Result_NonePeerHandleErrorZ_Err) { - key.channel().close(); - key.cancel(); + if ((key.interestOps() & SelectionKey.OP_ACCEPT) != 0) { + if (key.isAcceptable()) { + SocketChannel chan; + try { + chan = ((ServerSocketChannel) key.channel()).accept(); + } catch (IOException ignored) { + key.cancel(); + continue; + } + if (chan == null) continue; + try { + Peer peer = setup_socket(chan); + Result_NonePeerHandleErrorZ res = this.peer_manager.new_inbound_connection(peer.descriptor); + if (res instanceof Result_NonePeerHandleErrorZ.Result_NonePeerHandleErrorZ_OK) { + peer.key = chan.register(this.selector, SelectionKey.OP_READ, peer); + } + } catch (IOException ignored) { } } + continue; // There is no attachment so the rest of the loop is useless } - if (key.isValid() && (key.interestOps() & SelectionKey.OP_READ) != 0 && key.isReadable()) { - buf.clear(); - int read = ((SocketChannel) key.channel()).read(buf); - if (read == -1) { - this.peer_manager.socket_disconnected(peer.descriptor); - key.cancel(); - } else if (read > 0) { - buf.flip(); - byte[] read_bytes = new byte[read]; - buf.get(read_bytes, 0, read); - Result_boolPeerHandleErrorZ res = this.peer_manager.read_event(peer.descriptor, read_bytes); - if (res instanceof Result_boolPeerHandleErrorZ.Result_boolPeerHandleErrorZ_OK) { - if (((Result_boolPeerHandleErrorZ.Result_boolPeerHandleErrorZ_OK) res).res) { - key.interestOps(key.interestOps() & (~SelectionKey.OP_READ)); - } - } else { + Peer peer = (Peer) key.attachment(); + synchronized (peer) { + if (peer.disconnect_requested) + continue; + peer.block_disconnect_socket = true; + } + try { + if (key.isValid() && (key.interestOps() & SelectionKey.OP_WRITE) != 0 && key.isWritable()) { + Result_NonePeerHandleErrorZ res = this.peer_manager.write_buffer_space_avail(peer.descriptor); + if (res instanceof Result_NonePeerHandleErrorZ.Result_NonePeerHandleErrorZ_Err) { key.channel().close(); key.cancel(); } } + if (key.isValid() && (key.interestOps() & SelectionKey.OP_READ) != 0 && key.isReadable()) { + buf.clear(); + int read = ((SocketChannel) key.channel()).read(buf); + if (read == -1) { + this.peer_manager.socket_disconnected(peer.descriptor); + key.cancel(); + } else if (read > 0) { + buf.flip(); + byte[] read_bytes = new byte[read]; + buf.get(read_bytes, 0, read); + Result_boolPeerHandleErrorZ res = this.peer_manager.read_event(peer.descriptor, read_bytes); + if (res instanceof Result_boolPeerHandleErrorZ.Result_boolPeerHandleErrorZ_OK) { + if (((Result_boolPeerHandleErrorZ.Result_boolPeerHandleErrorZ_OK) res).res) { + key.interestOps(key.interestOps() & (~SelectionKey.OP_READ)); + } + } else { + key.channel().close(); + key.cancel(); + } + } + } + } catch (IOException ignored) { + try { key.channel().close(); } catch (IOException ignored2) { } + key.cancel(); + peer_manager.socket_disconnected(peer.descriptor); } - } catch (IOException ignored) { - try { key.channel().close(); } catch (IOException ignored2) { } - key.cancel(); - peer_manager.socket_disconnected(peer.descriptor); - } - synchronized (peer) { - peer.block_disconnect_socket = false; - peer.notifyAll(); + synchronized (peer) { + peer.block_disconnect_socket = false; + peer.notifyAll(); + } + } catch (CancelledKeyException e) { + try { key.channel().close(); } catch (IOException ignored) { } + // The key is only cancelled when we have notified the PeerManager that the socket is closed, so + // no need to do anything here with the PeerManager. } } if (lastTimerTick < System.currentTimeMillis() - 30 * 1000) {