X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Forg%2Fldk%2Fbatteries%2FNioPeerHandler.java;h=bf111729115492ba7e3c94c26645290b000794d3;hb=e6173a79ce766cef7a660f22fc8fd1220975f6ea;hp=eac4d8058a898fd7a8af0a61ddb8e2be68627f92;hpb=07818350417051973536ab58828f03b70f72ac93;p=ldk-java diff --git a/src/main/java/org/ldk/batteries/NioPeerHandler.java b/src/main/java/org/ldk/batteries/NioPeerHandler.java index eac4d805..bf111729 100644 --- a/src/main/java/org/ldk/batteries/NioPeerHandler.java +++ b/src/main/java/org/ldk/batteries/NioPeerHandler.java @@ -1,8 +1,11 @@ package org.ldk.batteries; +import org.ldk.impl.bindings; import org.ldk.structs.*; import java.io.IOException; +import java.lang.reflect.Field; +import java.util.LinkedList; import java.net.SocketAddress; import java.net.StandardSocketOptions; import java.nio.Buffer; @@ -16,6 +19,7 @@ import java.nio.channels.*; public class NioPeerHandler { private static class Peer { SocketDescriptor descriptor; + long descriptor_raw_pointer; SelectionKey key; } @@ -44,6 +48,19 @@ public class NioPeerHandler { } } + static private Field CommonBasePointer; + static { + try { + Class c = PeerManager.class.getSuperclass(); + CommonBasePointer = c.getDeclaredField("ptr"); + CommonBasePointer.setAccessible(true); + long _dummy_check = CommonBasePointer.getLong(Ping.of((short)0, (short)0)); + } catch (NoSuchFieldException | IllegalAccessException e) { + throw new IllegalArgumentException( + "We currently use reflection to access protected fields as Java has no reasonable access controls", e); + } + } + private Peer setup_socket(SocketChannel chan) throws IOException { chan.configureBlocking(false); // Lightning tends to send a number of small messages back and forth between peers quickly, which Nagle is @@ -60,13 +77,14 @@ public class NioPeerHandler { @Override public long send_data(byte[] data, boolean resume_read) { try { - if (resume_read) { - do_selector_action(() -> peer.key.interestOps(peer.key.interestOps() | SelectionKey.OP_READ)); - } long written = chan.write(ByteBuffer.wrap(data)); if (written != data.length) { - do_selector_action(() -> peer.key.interestOps(peer.key.interestOps() | SelectionKey.OP_WRITE)); - } + do_selector_action(() -> peer.key.interestOps( + (peer.key.interestOps() | SelectionKey.OP_WRITE) & (~SelectionKey.OP_READ))); + } else if (resume_read) { + do_selector_action(() -> peer.key.interestOps( + (peer.key.interestOps() | SelectionKey.OP_READ) & (~SelectionKey.OP_WRITE))); + } return written; } catch (IOException e) { // Most likely the socket is disconnected, let the background thread handle it. @@ -87,6 +105,12 @@ public class NioPeerHandler { @Override public long hash() { return our_id; } }); peer.descriptor = descriptor; + try { + peer.descriptor_raw_pointer = CommonBasePointer.getLong(descriptor); + } catch (IllegalAccessException e) { + throw new IllegalArgumentException( + "We currently use reflection to access protected fields as Java has no reasonable access controls", e); + } return peer; } @@ -98,7 +122,6 @@ public class NioPeerHandler { /** * Constructs a new peer handler, spawning a thread to monitor for socket events. - * The background thread will call the PeerManager's timer_tick_occured() function for you on an appropriate schedule. * * @param manager The LDK PeerManager which connection data will be provided to. * @throws IOException If an internal java.nio error occurs. @@ -107,8 +130,16 @@ public class NioPeerHandler { this.peer_manager = manager; this.selector = Selector.open(); io_thread = new Thread(() -> { - ByteBuffer buf = ByteBuffer.allocate(8192); - long lastTimerTick = System.currentTimeMillis(); + int BUF_SZ = 16 * 1024; + byte[] max_buf_byte_object = new byte[BUF_SZ]; + ByteBuffer buf = ByteBuffer.allocate(BUF_SZ); + + long peer_manager_raw_pointer; + try { + peer_manager_raw_pointer = CommonBasePointer.getLong(this.peer_manager); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } while (true) { try { if (IS_ANDROID) { @@ -168,17 +199,32 @@ public class NioPeerHandler { key.cancel(); } else if (read > 0) { ((Buffer)buf).flip(); - byte[] read_bytes = new byte[read]; + // This code is quite hot during initial network graph sync, so we go a ways out of + // our way to avoid object allocations that'll make the GC sweat later - + // * when we're hot, we'll likely often be reading the full buffer, so we keep + // around a full-buffer-sized byte array to reuse across reads, + // * We use the manual memory management call logic directly in bindings instead of + // the nice "human-readable" wrappers. This puts us at risk of memory issues, + // so we indirectly ensure compile fails if the types change by writing the + // "human-readable" form of the same code in the dummy function below. + byte[] read_bytes; + if (read == BUF_SZ) { + read_bytes = max_buf_byte_object; + } else { + read_bytes = new byte[read]; + } buf.get(read_bytes, 0, read); - Result_boolPeerHandleErrorZ res = this.peer_manager.read_event(peer.descriptor, read_bytes); - if (res instanceof Result_boolPeerHandleErrorZ.Result_boolPeerHandleErrorZ_OK) { - if (((Result_boolPeerHandleErrorZ.Result_boolPeerHandleErrorZ_OK) res).res) { + long read_result_pointer = bindings.PeerManager_read_event( + peer_manager_raw_pointer, peer.descriptor_raw_pointer, read_bytes); + if (bindings.LDKCResult_boolPeerHandleErrorZ_result_ok(read_result_pointer)) { + if (bindings.LDKCResult_boolPeerHandleErrorZ_get_ok(read_result_pointer)) { key.interestOps(key.interestOps() & (~SelectionKey.OP_READ)); } } else { key.channel().close(); key.cancel(); } + bindings.CResult_boolPeerHandleErrorZ_free(read_result_pointer); } } } catch (IOException ignored) { @@ -192,16 +238,19 @@ public class NioPeerHandler { // no need to do anything here with the PeerManager. } } - if (lastTimerTick < System.currentTimeMillis() - 30 * 1000) { - peer_manager.timer_tick_occurred(); - lastTimerTick = System.currentTimeMillis(); - } peer_manager.process_events(); } }, "NioPeerHandler NIO Thread"); io_thread.start(); } + // Ensure the types used in the above manual code match what they were when the code was written. + // Ensure the above manual bindings.* code changes if this fails to compile. + private void dummy_check_return_type_matches_manual_memory_code_above(Peer peer) { + byte[] read_bytes = new byte[32]; + Result_boolPeerHandleErrorZ res = this.peer_manager.read_event(peer.descriptor, read_bytes); + } + /** * Connect to a peer given their node id and socket address. Blocks until a connection is established (or returns * IOException) and then the connection handling runs in the background. @@ -236,6 +285,20 @@ public class NioPeerHandler { } } + /** + * Disconnects any connections currently open with the peer with the given node id. + * + * @param their_node_id must be a valid 33-byte public key + */ + public void disconnect(byte[] their_node_id) { + this.peer_manager.disconnect_by_node_id(their_node_id, false); + } + + /** + * Before shutdown, we have to ensure all of our listening sockets are closed manually, as they appear + * to otherwise remain open and lying around on OSX (though no other platform). + */ + private LinkedList listening_sockets = new LinkedList(); /** * Binds a listening socket to the given address, accepting incoming connections and handling them on the background * thread. @@ -245,9 +308,13 @@ public class NioPeerHandler { */ public void bind_listener(SocketAddress socket_address) throws IOException { ServerSocketChannel listen_channel = ServerSocketChannel.open(); + listen_channel.setOption(StandardSocketOptions.SO_REUSEADDR, true); listen_channel.bind(socket_address); listen_channel.configureBlocking(false); do_selector_action(() -> listen_channel.register(this.selector, SelectionKey.OP_ACCEPT)); + synchronized(listening_sockets) { + listening_sockets.add(listen_channel); + } } /** @@ -260,6 +327,14 @@ public class NioPeerHandler { try { io_thread.join(); } catch (InterruptedException ignored) { } + synchronized(listening_sockets) { + try { + selector.close(); + for (ServerSocketChannel chan : listening_sockets) { + chan.close(); + } + } catch (IOException ignored) {} + } } /**