X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Forg%2Fldk%2Fbatteries%2FNioPeerHandler.java;h=db0fa82b130e7ae7dd6635145d5112bc8b6ebb90;hb=7bd26ccdf638a248e8d45ab0feaa954c20c59f11;hp=299496229ad73980914ad71ce282fdde1e0842b5;hpb=3dcd430967b250ca9020d0f57a7fa5f8f5904aa9;p=ldk-java diff --git a/src/main/java/org/ldk/batteries/NioPeerHandler.java b/src/main/java/org/ldk/batteries/NioPeerHandler.java index 29949622..db0fa82b 100644 --- a/src/main/java/org/ldk/batteries/NioPeerHandler.java +++ b/src/main/java/org/ldk/batteries/NioPeerHandler.java @@ -5,12 +5,14 @@ import org.ldk.structs.*; import java.io.IOException; import java.net.SocketAddress; import java.net.StandardSocketOptions; +import java.nio.Buffer; import java.nio.ByteBuffer; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.ServerSocketChannel; -import java.nio.channels.SocketChannel; +import java.nio.channels.*; +/** + * A NioPeerHandler maps LDK's PeerHandler to Java's NIO I/O interface. It spawns a single background thread which + * processes socket events and provides the data to LDK for decryption and processing. + */ public class NioPeerHandler { private static class Peer { SocketDescriptor descriptor; @@ -19,9 +21,36 @@ public class NioPeerHandler { // This is set to true if we're in such a condition (with disconnect checked // before with the Peer monitor lock held) and false when we can return. boolean block_disconnect_socket = false; + // Indicates LDK told us to disconnect this peer, and thus we should not call socket_disconnected. + boolean disconnect_requested = false; SelectionKey key; } + // Android's java.nio implementation has a big lock inside the selector, preventing any concurrent access to it. + // This appears to largely defeat the entire purpose of java.nio, but we work around it here by explicitly checking + // for an Android environment and passing any selector access on any thread other than our internal one through + // do_selector_action, which wakes up the selector before accessing it. + private static boolean IS_ANDROID; + static { + IS_ANDROID = System.getProperty("java.vendor").toLowerCase().contains("android"); + } + private boolean wakeup_selector = false; + private interface SelectorCall { + void meth() throws IOException; + } + private void do_selector_action(SelectorCall meth) throws IOException { + if (IS_ANDROID) { + wakeup_selector = true; + this.selector.wakeup(); + synchronized (this.selector) { + meth.meth(); + wakeup_selector = false; + } + } else { + meth.meth(); + } + } + private Peer setup_socket(SocketChannel chan) throws IOException { chan.configureBlocking(false); // Lightning tends to send a number of small messages back and forth between peers quickly, which Nagle is @@ -37,13 +66,13 @@ public class NioPeerHandler { SocketDescriptor descriptor = SocketDescriptor.new_impl(new SocketDescriptor.SocketDescriptorInterface() { @Override public long send_data(byte[] data, boolean resume_read) { - if (resume_read) { - peer.key.interestOps(peer.key.interestOps() | SelectionKey.OP_READ); - } try { + if (resume_read) { + do_selector_action(() -> peer.key.interestOps(peer.key.interestOps() | SelectionKey.OP_READ)); + } long written = chan.write(ByteBuffer.wrap(data)); if (written != data.length) { - peer.key.interestOps(peer.key.interestOps() | SelectionKey.OP_WRITE); + do_selector_action(() -> peer.key.interestOps(peer.key.interestOps() | SelectionKey.OP_WRITE)); } return written; } catch (IOException e) { @@ -54,10 +83,14 @@ public class NioPeerHandler { @Override public void disconnect_socket() { + synchronized (peer) { + peer.disconnect_requested = true; + } try { - peer.key.cancel(); - peer.key.channel().close(); - selector.wakeup(); + do_selector_action(() -> { + peer.key.cancel(); + peer.key.channel().close(); + }); } catch (IOException ignored) { } synchronized (peer) { while (peer.block_disconnect_socket) { @@ -76,7 +109,7 @@ public class NioPeerHandler { PeerManager peer_manager; Thread io_thread; - Selector selector; + final Selector selector; long socket_id; volatile boolean shutdown = false; @@ -88,7 +121,6 @@ public class NioPeerHandler { * @throws IOException If an internal java.nio error occurs. */ public NioPeerHandler(PeerManager manager) throws IOException { -long id = manager._test_only_get_ptr(); this.peer_manager = manager; this.selector = Selector.open(); io_thread = new Thread(() -> { @@ -96,7 +128,18 @@ long id = manager._test_only_get_ptr(); long lastTimerTick = System.currentTimeMillis(); while (true) { try { - this.selector.select(1000); + if (IS_ANDROID) { + while (true) { + synchronized (this.selector) { + if (!wakeup_selector) { + this.selector.select(1000); + break; + } + } + } + } else { + this.selector.select(1000); + } } catch (IOException ignored) { System.err.println("java.nio threw an unexpected IOException. Stopping PeerHandler thread!"); return; @@ -104,71 +147,79 @@ long id = manager._test_only_get_ptr(); if (shutdown) return; if (Thread.interrupted()) return; for (SelectionKey key : this.selector.selectedKeys()) { - if ((key.interestOps() & SelectionKey.OP_ACCEPT) != 0) { - if (key.isAcceptable()) { - SocketChannel chan; - try { - chan = ((ServerSocketChannel) key.channel()).accept(); - } catch (IOException ignored) { - key.cancel(); - continue; - } - if (chan == null) continue; - try { - Peer peer = setup_socket(chan); - Result_NonePeerHandleErrorZ res = this.peer_manager.new_inbound_connection(peer.descriptor); - if (res instanceof Result_NonePeerHandleErrorZ.Result_NonePeerHandleErrorZ_OK) { - peer.key = chan.register(this.selector, SelectionKey.OP_READ, peer); - } - } catch (IOException ignored) { } - } - continue; // There is no attachment so the rest of the loop is useless - } - Peer peer = (Peer) key.attachment(); - synchronized (peer) { - peer.block_disconnect_socket = true; - } try { - if (key.isValid() && (key.interestOps() & SelectionKey.OP_WRITE) != 0 && key.isWritable()) { - Result_NonePeerHandleErrorZ res = this.peer_manager.write_buffer_space_avail(peer.descriptor); - if (res instanceof Result_NonePeerHandleErrorZ.Result_NonePeerHandleErrorZ_Err) { - key.channel().close(); - key.cancel(); + if ((key.interestOps() & SelectionKey.OP_ACCEPT) != 0) { + if (key.isAcceptable()) { + SocketChannel chan; + try { + chan = ((ServerSocketChannel) key.channel()).accept(); + } catch (IOException ignored) { + key.cancel(); + continue; + } + if (chan == null) continue; + try { + Peer peer = setup_socket(chan); + Result_NonePeerHandleErrorZ res = this.peer_manager.new_inbound_connection(peer.descriptor); + if (res instanceof Result_NonePeerHandleErrorZ.Result_NonePeerHandleErrorZ_OK) { + peer.key = chan.register(this.selector, SelectionKey.OP_READ, peer); + } + } catch (IOException ignored) { } } + continue; // There is no attachment so the rest of the loop is useless } - if (key.isValid() && (key.interestOps() & SelectionKey.OP_READ) != 0 && key.isReadable()) { - buf.clear(); - int read = ((SocketChannel) key.channel()).read(buf); - if (read == -1) { - this.peer_manager.socket_disconnected(peer.descriptor); - key.cancel(); - } else if (read > 0) { - buf.flip(); - byte[] read_bytes = new byte[read]; - buf.get(read_bytes, 0, read); - Result_boolPeerHandleErrorZ res = this.peer_manager.read_event(peer.descriptor, read_bytes); - if (res instanceof Result_boolPeerHandleErrorZ.Result_boolPeerHandleErrorZ_OK) { - if (((Result_boolPeerHandleErrorZ.Result_boolPeerHandleErrorZ_OK) res).res) { - key.interestOps(key.interestOps() & (~SelectionKey.OP_READ)); - } - } else { + Peer peer = (Peer) key.attachment(); + synchronized (peer) { + if (peer.disconnect_requested) + continue; + peer.block_disconnect_socket = true; + } + try { + if (key.isValid() && (key.interestOps() & SelectionKey.OP_WRITE) != 0 && key.isWritable()) { + Result_NonePeerHandleErrorZ res = this.peer_manager.write_buffer_space_avail(peer.descriptor); + if (res instanceof Result_NonePeerHandleErrorZ.Result_NonePeerHandleErrorZ_Err) { key.channel().close(); key.cancel(); } } + if (key.isValid() && (key.interestOps() & SelectionKey.OP_READ) != 0 && key.isReadable()) { + ((Buffer)buf).clear(); + int read = ((SocketChannel) key.channel()).read(buf); + if (read == -1) { + this.peer_manager.socket_disconnected(peer.descriptor); + key.cancel(); + } else if (read > 0) { + ((Buffer)buf).flip(); + byte[] read_bytes = new byte[read]; + buf.get(read_bytes, 0, read); + Result_boolPeerHandleErrorZ res = this.peer_manager.read_event(peer.descriptor, read_bytes); + if (res instanceof Result_boolPeerHandleErrorZ.Result_boolPeerHandleErrorZ_OK) { + if (((Result_boolPeerHandleErrorZ.Result_boolPeerHandleErrorZ_OK) res).res) { + key.interestOps(key.interestOps() & (~SelectionKey.OP_READ)); + } + } else { + key.channel().close(); + key.cancel(); + } + } + } + } catch (IOException ignored) { + try { key.channel().close(); } catch (IOException ignored2) { } + key.cancel(); + peer_manager.socket_disconnected(peer.descriptor); } - } catch (IOException ignored) { - try { key.channel().close(); } catch (IOException ignored2) { } - key.cancel(); - peer_manager.socket_disconnected(peer.descriptor); - } - synchronized (peer) { - peer.block_disconnect_socket = false; - peer.notifyAll(); + synchronized (peer) { + peer.block_disconnect_socket = false; + peer.notifyAll(); + } + } catch (CancelledKeyException e) { + try { key.channel().close(); } catch (IOException ignored) { } + // The key is only cancelled when we have notified the PeerManager that the socket is closed, so + // no need to do anything here with the PeerManager. } } if (lastTimerTick < System.currentTimeMillis() - 30 * 1000) { - peer_manager.timer_tick_occured(); + peer_manager.timer_tick_occurred(); lastTimerTick = System.currentTimeMillis(); } peer_manager.process_events(); @@ -184,10 +235,20 @@ long id = manager._test_only_get_ptr(); * @param their_node_id A valid 33-byte public key representing the peer's Lightning Node ID. If this is invalid, * undefined behavior (read: Segfault, etc) may occur. * @param remote The socket address to connect to. + * @param timeout_ms The amount of time, in milliseconds, up to which we will wait for connection to complete. * @throws IOException If connecting to the remote endpoint fails or internal java.nio errors occur. */ - public void connect(byte[] their_node_id, SocketAddress remote) throws IOException { - SocketChannel chan = SocketChannel.open(remote); + public void connect(byte[] their_node_id, SocketAddress remote, int timeout_ms) throws IOException { + SocketChannel chan = SocketChannel.open(); + chan.configureBlocking(false); + Selector open_selector = Selector.open(); + chan.register(open_selector, SelectionKey.OP_CONNECT); + if (!chan.connect(remote)) { + open_selector.select(timeout_ms); + } + if (!chan.finishConnect()) { // Note that this may throw its own IOException if we failed for another reason + throw new IOException("Timed out"); + } Peer peer = setup_socket(chan); Result_CVec_u8ZPeerHandleErrorZ res = this.peer_manager.new_outbound_connection(their_node_id, peer.descriptor); if (res instanceof Result_CVec_u8ZPeerHandleErrorZ.Result_CVec_u8ZPeerHandleErrorZ_OK) { @@ -195,8 +256,7 @@ long id = manager._test_only_get_ptr(); if (chan.write(ByteBuffer.wrap(initial_bytes)) != initial_bytes.length) { throw new IOException("We assume TCP socket buffer is at least a single packet in length"); } - peer.key = chan.register(this.selector, SelectionKey.OP_READ, peer); - this.selector.wakeup(); + do_selector_action(() -> peer.key = chan.register(this.selector, SelectionKey.OP_READ, peer)); } else { throw new IOException("LDK rejected outbound connection. This likely shouldn't ever happen."); } @@ -213,8 +273,7 @@ long id = manager._test_only_get_ptr(); ServerSocketChannel listen_channel = ServerSocketChannel.open(); listen_channel.bind(socket_address); listen_channel.configureBlocking(false); - listen_channel.register(this.selector, SelectionKey.OP_ACCEPT); - this.selector.wakeup(); + do_selector_action(() -> listen_channel.register(this.selector, SelectionKey.OP_ACCEPT)); } /**