X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Forg%2Fldk%2Fbatteries%2FNioPeerHandler.java;h=53a5353c2738f5ac4f0f0b1e9e471269b2e50196;hb=e2aa598aa07fde645c4fab04df6f49db84a5e6db;hp=faca477f599d284539385af23c0d09bde2987b0f;hpb=7d6be8a5ef72a4ebfe07660cce55f43f6cc30b80;p=ldk-java diff --git a/src/main/java/org/ldk/batteries/NioPeerHandler.java b/src/main/java/org/ldk/batteries/NioPeerHandler.java index faca477f..53a5353c 100644 --- a/src/main/java/org/ldk/batteries/NioPeerHandler.java +++ b/src/main/java/org/ldk/batteries/NioPeerHandler.java @@ -1,8 +1,12 @@ package org.ldk.batteries; +import org.ldk.impl.bindings; import org.ldk.structs.*; import java.io.IOException; +import java.lang.reflect.Field; +import java.lang.ref.Reference; +import java.util.LinkedList; import java.net.SocketAddress; import java.net.StandardSocketOptions; import java.nio.Buffer; @@ -16,6 +20,7 @@ import java.nio.channels.*; public class NioPeerHandler { private static class Peer { SocketDescriptor descriptor; + long descriptor_raw_pointer; SelectionKey key; } @@ -44,6 +49,19 @@ public class NioPeerHandler { } } + static private Field CommonBasePointer; + static { + try { + Class c = PeerManager.class.getSuperclass(); + CommonBasePointer = c.getDeclaredField("ptr"); + CommonBasePointer.setAccessible(true); + long _dummy_check = CommonBasePointer.getLong(Ping.of((short)0, (short)0)); + } catch (NoSuchFieldException | IllegalAccessException e) { + throw new IllegalArgumentException( + "We currently use reflection to access protected fields as Java has no reasonable access controls", e); + } + } + private Peer setup_socket(SocketChannel chan) throws IOException { chan.configureBlocking(false); // Lightning tends to send a number of small messages back and forth between peers quickly, which Nagle is @@ -60,13 +78,14 @@ public class NioPeerHandler { @Override public long send_data(byte[] data, boolean resume_read) { try { - if (resume_read) { - do_selector_action(() -> peer.key.interestOps(peer.key.interestOps() | SelectionKey.OP_READ)); - } long written = chan.write(ByteBuffer.wrap(data)); if (written != data.length) { - do_selector_action(() -> peer.key.interestOps(peer.key.interestOps() | SelectionKey.OP_WRITE)); - } + do_selector_action(() -> peer.key.interestOps( + (peer.key.interestOps() | SelectionKey.OP_WRITE) & (~SelectionKey.OP_READ))); + } else if (resume_read) { + do_selector_action(() -> peer.key.interestOps( + (peer.key.interestOps() | SelectionKey.OP_READ) & (~SelectionKey.OP_WRITE))); + } return written; } catch (IOException e) { // Most likely the socket is disconnected, let the background thread handle it. @@ -87,6 +106,12 @@ public class NioPeerHandler { @Override public long hash() { return our_id; } }); peer.descriptor = descriptor; + try { + peer.descriptor_raw_pointer = CommonBasePointer.getLong(descriptor); + } catch (IllegalAccessException e) { + throw new IllegalArgumentException( + "We currently use reflection to access protected fields as Java has no reasonable access controls", e); + } return peer; } @@ -106,7 +131,16 @@ public class NioPeerHandler { this.peer_manager = manager; this.selector = Selector.open(); io_thread = new Thread(() -> { - ByteBuffer buf = ByteBuffer.allocate(8192); + int BUF_SZ = 16 * 1024; + byte[] max_buf_byte_object = new byte[BUF_SZ]; + ByteBuffer buf = ByteBuffer.allocate(BUF_SZ); + + long peer_manager_raw_pointer; + try { + peer_manager_raw_pointer = CommonBasePointer.getLong(this.peer_manager); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } while (true) { try { if (IS_ANDROID) { @@ -141,10 +175,11 @@ public class NioPeerHandler { if (chan == null) continue; try { Peer peer = setup_socket(chan); + peer.key = chan.register(this.selector, SelectionKey.OP_READ, peer); Result_NonePeerHandleErrorZ res = this.peer_manager.new_inbound_connection(peer.descriptor); - if (res instanceof Result_NonePeerHandleErrorZ.Result_NonePeerHandleErrorZ_OK) { - peer.key = chan.register(this.selector, SelectionKey.OP_READ, peer); - } + if (res instanceof Result_NonePeerHandleErrorZ.Result_NonePeerHandleErrorZ_Err) { + peer.descriptor.disconnect_socket(); + } } catch (IOException ignored) { } } continue; // There is no attachment so the rest of the loop is useless @@ -166,17 +201,32 @@ public class NioPeerHandler { key.cancel(); } else if (read > 0) { ((Buffer)buf).flip(); - byte[] read_bytes = new byte[read]; + // This code is quite hot during initial network graph sync, so we go a ways out of + // our way to avoid object allocations that'll make the GC sweat later - + // * when we're hot, we'll likely often be reading the full buffer, so we keep + // around a full-buffer-sized byte array to reuse across reads, + // * We use the manual memory management call logic directly in bindings instead of + // the nice "human-readable" wrappers. This puts us at risk of memory issues, + // so we indirectly ensure compile fails if the types change by writing the + // "human-readable" form of the same code in the dummy function below. + byte[] read_bytes; + if (read == BUF_SZ) { + read_bytes = max_buf_byte_object; + } else { + read_bytes = new byte[read]; + } buf.get(read_bytes, 0, read); - Result_boolPeerHandleErrorZ res = this.peer_manager.read_event(peer.descriptor, read_bytes); - if (res instanceof Result_boolPeerHandleErrorZ.Result_boolPeerHandleErrorZ_OK) { - if (((Result_boolPeerHandleErrorZ.Result_boolPeerHandleErrorZ_OK) res).res) { + long read_result_pointer = bindings.PeerManager_read_event( + peer_manager_raw_pointer, peer.descriptor_raw_pointer, read_bytes); + if (bindings.CResult_boolPeerHandleErrorZ_is_ok(read_result_pointer)) { + if (bindings.LDKCResult_boolPeerHandleErrorZ_get_ok(read_result_pointer)) { key.interestOps(key.interestOps() & (~SelectionKey.OP_READ)); } } else { key.channel().close(); key.cancel(); } + bindings.CResult_boolPeerHandleErrorZ_free(read_result_pointer); } } } catch (IOException ignored) { @@ -196,6 +246,13 @@ public class NioPeerHandler { io_thread.start(); } + // Ensure the types used in the above manual code match what they were when the code was written. + // Ensure the above manual bindings.* code changes if this fails to compile. + private void dummy_check_return_type_matches_manual_memory_code_above(Peer peer) { + byte[] read_bytes = new byte[32]; + Result_boolPeerHandleErrorZ res = this.peer_manager.read_event(peer.descriptor, read_bytes); + } + /** * Connect to a peer given their node id and socket address. Blocks until a connection is established (or returns * IOException) and then the connection handling runs in the background. @@ -218,14 +275,17 @@ public class NioPeerHandler { throw new IOException("Timed out"); } Peer peer = setup_socket(chan); + do_selector_action(() -> peer.key = chan.register(this.selector, SelectionKey.OP_READ, peer)); Result_CVec_u8ZPeerHandleErrorZ res = this.peer_manager.new_outbound_connection(their_node_id, peer.descriptor); if (res instanceof Result_CVec_u8ZPeerHandleErrorZ.Result_CVec_u8ZPeerHandleErrorZ_OK) { byte[] initial_bytes = ((Result_CVec_u8ZPeerHandleErrorZ.Result_CVec_u8ZPeerHandleErrorZ_OK) res).res; if (chan.write(ByteBuffer.wrap(initial_bytes)) != initial_bytes.length) { + peer.descriptor.disconnect_socket(); + this.peer_manager.socket_disconnected(peer.descriptor); throw new IOException("We assume TCP socket buffer is at least a single packet in length"); } - do_selector_action(() -> peer.key = chan.register(this.selector, SelectionKey.OP_READ, peer)); } else { + peer.descriptor.disconnect_socket(); throw new IOException("LDK rejected outbound connection. This likely shouldn't ever happen."); } } @@ -239,6 +299,11 @@ public class NioPeerHandler { this.peer_manager.disconnect_by_node_id(their_node_id, false); } + /** + * Before shutdown, we have to ensure all of our listening sockets are closed manually, as they appear + * to otherwise remain open and lying around on OSX (though no other platform). + */ + private LinkedList listening_sockets = new LinkedList(); /** * Binds a listening socket to the given address, accepting incoming connections and handling them on the background * thread. @@ -248,9 +313,13 @@ public class NioPeerHandler { */ public void bind_listener(SocketAddress socket_address) throws IOException { ServerSocketChannel listen_channel = ServerSocketChannel.open(); + listen_channel.setOption(StandardSocketOptions.SO_REUSEADDR, true); listen_channel.bind(socket_address); listen_channel.configureBlocking(false); do_selector_action(() -> listen_channel.register(this.selector, SelectionKey.OP_ACCEPT)); + synchronized(listening_sockets) { + listening_sockets.add(listen_channel); + } } /** @@ -263,6 +332,15 @@ public class NioPeerHandler { try { io_thread.join(); } catch (InterruptedException ignored) { } + synchronized(listening_sockets) { + try { + selector.close(); + for (ServerSocketChannel chan : listening_sockets) { + chan.close(); + } + } catch (IOException ignored) {} + } + Reference.reachabilityFence(this.peer_manager); // Almost certainly overkill, but no harm in it } /**