From 3dcd430967b250ca9020d0f57a7fa5f8f5904aa9 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 1 Feb 2021 00:40:15 -0500 Subject: [PATCH] Add NioPeerHandler and associated test --- .../org/ldk/batteries/NioPeerHandler.java | 239 ++++++++++++++++++ .../java/org/ldk/HumanObjectPeerTest.java | 139 ++++++---- 2 files changed, 329 insertions(+), 49 deletions(-) create mode 100644 src/main/java/org/ldk/batteries/NioPeerHandler.java diff --git a/src/main/java/org/ldk/batteries/NioPeerHandler.java b/src/main/java/org/ldk/batteries/NioPeerHandler.java new file mode 100644 index 00000000..29949622 --- /dev/null +++ b/src/main/java/org/ldk/batteries/NioPeerHandler.java @@ -0,0 +1,239 @@ +package org.ldk.batteries; + +import org.ldk.structs.*; + +import java.io.IOException; +import java.net.SocketAddress; +import java.net.StandardSocketOptions; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; + +public class NioPeerHandler { + private static class Peer { + SocketDescriptor descriptor; + // When we are told by LDK to disconnect, we can't return to LDK until we are sure + // won't call any more read/write PeerManager functions with the same connection. + // This is set to true if we're in such a condition (with disconnect checked + // before with the Peer monitor lock held) and false when we can return. + boolean block_disconnect_socket = false; + SelectionKey key; + } + + private Peer setup_socket(SocketChannel chan) throws IOException { + chan.configureBlocking(false); + // Lightning tends to send a number of small messages back and forth between peers quickly, which Nagle is + // particularly bad at handling, so we disable it here. + chan.setOption(StandardSocketOptions.TCP_NODELAY, true); + long our_id; + synchronized (this) { + this.socket_id = this.socket_id + 1; + our_id = this.socket_id; + } + + final Peer peer = new Peer(); + SocketDescriptor descriptor = SocketDescriptor.new_impl(new SocketDescriptor.SocketDescriptorInterface() { + @Override + public long send_data(byte[] data, boolean resume_read) { + if (resume_read) { + peer.key.interestOps(peer.key.interestOps() | SelectionKey.OP_READ); + } + try { + long written = chan.write(ByteBuffer.wrap(data)); + if (written != data.length) { + peer.key.interestOps(peer.key.interestOps() | SelectionKey.OP_WRITE); + } + return written; + } catch (IOException e) { + // Most likely the socket is disconnected, let the background thread handle it. + return 0; + } + } + + @Override + public void disconnect_socket() { + try { + peer.key.cancel(); + peer.key.channel().close(); + selector.wakeup(); + } catch (IOException ignored) { } + synchronized (peer) { + while (peer.block_disconnect_socket) { + try { + peer.wait(); + } catch (InterruptedException ignored) { } + } + } + } + @Override public boolean eq(SocketDescriptor other_arg) { return other_arg.hash() == our_id; } + @Override public long hash() { return our_id; } + }); + peer.descriptor = descriptor; + return peer; + } + + PeerManager peer_manager; + Thread io_thread; + Selector selector; + long socket_id; + volatile boolean shutdown = false; + + /** + * Constructs a new peer handler, spawning a thread to monitor for socket events. + * The background thread will call the PeerManager's timer_tick_occured() function for you on an appropriate schedule. + * + * @param manager The LDK PeerManager which connection data will be provided to. + * @throws IOException If an internal java.nio error occurs. + */ + public NioPeerHandler(PeerManager manager) throws IOException { +long id = manager._test_only_get_ptr(); + this.peer_manager = manager; + this.selector = Selector.open(); + io_thread = new Thread(() -> { + ByteBuffer buf = ByteBuffer.allocate(8192); + long lastTimerTick = System.currentTimeMillis(); + while (true) { + try { + this.selector.select(1000); + } catch (IOException ignored) { + System.err.println("java.nio threw an unexpected IOException. Stopping PeerHandler thread!"); + return; + } + if (shutdown) return; + if (Thread.interrupted()) return; + for (SelectionKey key : this.selector.selectedKeys()) { + if ((key.interestOps() & SelectionKey.OP_ACCEPT) != 0) { + if (key.isAcceptable()) { + SocketChannel chan; + try { + chan = ((ServerSocketChannel) key.channel()).accept(); + } catch (IOException ignored) { + key.cancel(); + continue; + } + if (chan == null) continue; + try { + Peer peer = setup_socket(chan); + Result_NonePeerHandleErrorZ res = this.peer_manager.new_inbound_connection(peer.descriptor); + if (res instanceof Result_NonePeerHandleErrorZ.Result_NonePeerHandleErrorZ_OK) { + peer.key = chan.register(this.selector, SelectionKey.OP_READ, peer); + } + } catch (IOException ignored) { } + } + continue; // There is no attachment so the rest of the loop is useless + } + Peer peer = (Peer) key.attachment(); + synchronized (peer) { + peer.block_disconnect_socket = true; + } + try { + if (key.isValid() && (key.interestOps() & SelectionKey.OP_WRITE) != 0 && key.isWritable()) { + Result_NonePeerHandleErrorZ res = this.peer_manager.write_buffer_space_avail(peer.descriptor); + if (res instanceof Result_NonePeerHandleErrorZ.Result_NonePeerHandleErrorZ_Err) { + key.channel().close(); + key.cancel(); + } + } + if (key.isValid() && (key.interestOps() & SelectionKey.OP_READ) != 0 && key.isReadable()) { + buf.clear(); + int read = ((SocketChannel) key.channel()).read(buf); + if (read == -1) { + this.peer_manager.socket_disconnected(peer.descriptor); + key.cancel(); + } else if (read > 0) { + buf.flip(); + byte[] read_bytes = new byte[read]; + buf.get(read_bytes, 0, read); + Result_boolPeerHandleErrorZ res = this.peer_manager.read_event(peer.descriptor, read_bytes); + if (res instanceof Result_boolPeerHandleErrorZ.Result_boolPeerHandleErrorZ_OK) { + if (((Result_boolPeerHandleErrorZ.Result_boolPeerHandleErrorZ_OK) res).res) { + key.interestOps(key.interestOps() & (~SelectionKey.OP_READ)); + } + } else { + key.channel().close(); + key.cancel(); + } + } + } + } catch (IOException ignored) { + try { key.channel().close(); } catch (IOException ignored2) { } + key.cancel(); + peer_manager.socket_disconnected(peer.descriptor); + } + synchronized (peer) { + peer.block_disconnect_socket = false; + peer.notifyAll(); + } + } + if (lastTimerTick < System.currentTimeMillis() - 30 * 1000) { + peer_manager.timer_tick_occured(); + lastTimerTick = System.currentTimeMillis(); + } + peer_manager.process_events(); + } + }, "NioPeerHandler NIO Thread"); + io_thread.start(); + } + + /** + * Connect to a peer given their node id and socket address. Blocks until a connection is established (or returns + * IOException) and then the connection handling runs in the background. + * + * @param their_node_id A valid 33-byte public key representing the peer's Lightning Node ID. If this is invalid, + * undefined behavior (read: Segfault, etc) may occur. + * @param remote The socket address to connect to. + * @throws IOException If connecting to the remote endpoint fails or internal java.nio errors occur. + */ + public void connect(byte[] their_node_id, SocketAddress remote) throws IOException { + SocketChannel chan = SocketChannel.open(remote); + Peer peer = setup_socket(chan); + Result_CVec_u8ZPeerHandleErrorZ res = this.peer_manager.new_outbound_connection(their_node_id, peer.descriptor); + if (res instanceof Result_CVec_u8ZPeerHandleErrorZ.Result_CVec_u8ZPeerHandleErrorZ_OK) { + byte[] initial_bytes = ((Result_CVec_u8ZPeerHandleErrorZ.Result_CVec_u8ZPeerHandleErrorZ_OK) res).res; + if (chan.write(ByteBuffer.wrap(initial_bytes)) != initial_bytes.length) { + throw new IOException("We assume TCP socket buffer is at least a single packet in length"); + } + peer.key = chan.register(this.selector, SelectionKey.OP_READ, peer); + this.selector.wakeup(); + } else { + throw new IOException("LDK rejected outbound connection. This likely shouldn't ever happen."); + } + } + + /** + * Binds a listening socket to the given address, accepting incoming connections and handling them on the background + * thread. + * + * @param socket_address The address to bind the listening socket to. + * @throws IOException if binding the listening socket fail. + */ + public void bind_listener(SocketAddress socket_address) throws IOException { + ServerSocketChannel listen_channel = ServerSocketChannel.open(); + listen_channel.bind(socket_address); + listen_channel.configureBlocking(false); + listen_channel.register(this.selector, SelectionKey.OP_ACCEPT); + this.selector.wakeup(); + } + + /** + * Interrupt the background thread, stopping all peer handling. Disconnection events to the PeerHandler are not made, + * potentially leaving the PeerHandler in an inconsistent state. + */ + public void interrupt() { + shutdown = true; + selector.wakeup(); + try { + io_thread.join(); + } catch (InterruptedException ignored) { } + } + + /** + * Calls process_events on the PeerManager immediately. Normally process_events is polled regularly to check for new + * messages which need to be sent, but you can interrupt the poll and check immediately by calling this function. + */ + public void check_events() { + selector.wakeup(); + } +} \ No newline at end of file diff --git a/src/test/java/org/ldk/HumanObjectPeerTest.java b/src/test/java/org/ldk/HumanObjectPeerTest.java index b47b6e8e..27ddc7c2 100644 --- a/src/test/java/org/ldk/HumanObjectPeerTest.java +++ b/src/test/java/org/ldk/HumanObjectPeerTest.java @@ -4,12 +4,15 @@ import org.bitcoinj.core.*; import org.bitcoinj.core.Transaction; import org.bitcoinj.script.Script; import org.junit.jupiter.api.Test; +import org.ldk.batteries.NioPeerHandler; import org.ldk.enums.LDKNetwork; import org.ldk.impl.bindings; import org.ldk.structs.*; import org.ldk.util.TwoTuple; +import java.io.IOException; import java.lang.ref.WeakReference; +import java.net.InetSocketAddress; import java.util.Arrays; import java.util.HashMap; import java.util.LinkedList; @@ -20,13 +23,15 @@ class HumanObjectPeerTestInstance { private final boolean use_manual_watch; private final boolean reload_peers; private final boolean break_cross_peer_refs; + private final boolean use_nio_peer_handler; - HumanObjectPeerTestInstance(boolean nice_close, boolean use_km_wrapper, boolean use_manual_watch, boolean reload_peers, boolean break_cross_peer_refs) { + HumanObjectPeerTestInstance(boolean nice_close, boolean use_km_wrapper, boolean use_manual_watch, boolean reload_peers, boolean break_cross_peer_refs, boolean use_nio_peer_handler) { this.nice_close = nice_close; this.use_km_wrapper = use_km_wrapper; this.use_manual_watch = use_manual_watch; this.reload_peers = reload_peers; this.break_cross_peer_refs = break_cross_peer_refs; + this.use_nio_peer_handler = use_nio_peer_handler; } class Peer { @@ -150,6 +155,8 @@ class HumanObjectPeerTestInstance { return watch; } + NioPeerHandler nio_peer_handler; + short nio_port; final byte seed; final Logger logger; final FeeEstimator fee_estimator; @@ -221,15 +228,25 @@ class HumanObjectPeerTestInstance { for (byte i = 0; i < 32; i++) { key_seed[i] = (byte) (i ^ seed); } + KeysManager keys = KeysManager.constructor_new(key_seed, LDKNetwork.LDKNetwork_Bitcoin, System.currentTimeMillis() / 1000, (int) (System.currentTimeMillis() * 1000)); if (use_km_wrapper) { - KeysManager underlying = KeysManager.constructor_new(key_seed, LDKNetwork.LDKNetwork_Bitcoin, System.currentTimeMillis() / 1000, (int) (System.currentTimeMillis() * 1000) & 0xffffffff); - this.keys_interface = manual_keysif(underlying.as_KeysInterface()); + this.keys_interface = manual_keysif(keys.as_KeysInterface()); } else { - KeysManager keys = KeysManager.constructor_new(key_seed, LDKNetwork.LDKNetwork_Bitcoin, System.currentTimeMillis() / 1000, (int) (System.currentTimeMillis() * 1000) & 0xffffffff); this.keys_interface = keys.as_KeysInterface(); } this.router = NetGraphMsgHandler.constructor_new(new byte[32], null, logger); } + private void bind_nio() { + if (!use_nio_peer_handler) return; + try { this.nio_peer_handler = new NioPeerHandler(peer_manager); } catch (IOException e) { assert false; } + for (short i = 10_000; true; i++) { + try { + nio_peer_handler.bind_listener(new InetSocketAddress("127.0.0.1", i)); + nio_port = i; + break; + } catch (IOException e) { assert i < 10_500; } + } + } Peer(byte seed) { this(null, seed); this.chan_manager = ChannelManager.constructor_new(LDKNetwork.LDKNetwork_Bitcoin, FeeEstimator.new_impl(confirmation_target -> 0), chain_watch, tx_broadcaster, logger, this.keys_interface, UserConfig.constructor_default(), 1); @@ -241,6 +258,7 @@ class HumanObjectPeerTestInstance { random_data[i] = (byte) ((i ^ seed) ^ 0xf0); } this.peer_manager = PeerManager.constructor_new(chan_manager.as_ChannelMessageHandler(), router.as_RoutingMessageHandler(), keys_interface.get_node_secret(), random_data, logger); + bind_nio(); System.gc(); } Object ptr_to; @@ -287,6 +305,7 @@ class HumanObjectPeerTestInstance { random_data[i] = (byte) ((i ^ seed) ^ 0xf0); } this.peer_manager = PeerManager.constructor_new(chan_manager.as_ChannelMessageHandler(), router.as_RoutingMessageHandler(), keys_interface.get_node_secret(), random_data, logger); + bind_nio(); } TwoTuple[]>[] connect_block(Block b, int height, long expected_monitor_update_len) { @@ -357,20 +376,26 @@ class HumanObjectPeerTestInstance { } }); void wait_events_processed(Peer peer1, Peer peer2) { - synchronized (runqueue) { - ran = false; - } - while (true) { - peer1.peer_manager.process_events(); - peer2.peer_manager.process_events(); + if (use_nio_peer_handler) { + peer1.nio_peer_handler.check_events(); + peer2.nio_peer_handler.check_events(); + try { Thread.sleep(500); } catch (InterruptedException e) { assert false; } + } else { synchronized (runqueue) { - if (runqueue.isEmpty() && !running) { - if (ran) { - ran = false; - continue; - } else { break; } + ran = false; + } + while (true) { + peer1.peer_manager.process_events(); + peer2.peer_manager.process_events(); + synchronized (runqueue) { + if (runqueue.isEmpty() && !running) { + if (ran) { + ran = false; + continue; + } else { break; } + } + try { runqueue.wait(); } catch (InterruptedException e) { assert false; } } - try { runqueue.wait(); } catch (InterruptedException e) { assert false; } } } } @@ -388,38 +413,44 @@ class HumanObjectPeerTestInstance { } void connect_peers(final Peer peer1, final Peer peer2) { - DescriptorHolder descriptor1 = new DescriptorHolder(); - DescriptorHolder descriptor1ref = descriptor1; - SocketDescriptor descriptor2 = SocketDescriptor.new_impl(new SocketDescriptor.SocketDescriptorInterface() { - @Override - public long send_data(byte[] data, boolean resume_read) { - do_read_event(peer1.peer_manager, descriptor1ref.val, data); - return data.length; - } + if (use_nio_peer_handler) { + try { + peer1.nio_peer_handler.connect(peer2.chan_manager.get_our_node_id(), new InetSocketAddress("127.0.0.1", peer2.nio_port)); + } catch (IOException e) { assert false; } + } else { + DescriptorHolder descriptor1 = new DescriptorHolder(); + DescriptorHolder descriptor1ref = descriptor1; + SocketDescriptor descriptor2 = SocketDescriptor.new_impl(new SocketDescriptor.SocketDescriptorInterface() { + @Override + public long send_data(byte[] data, boolean resume_read) { + do_read_event(peer1.peer_manager, descriptor1ref.val, data); + return data.length; + } - @Override public void disconnect_socket() { assert false; } - @Override public boolean eq(SocketDescriptor other_arg) { return other_arg.hash() == 2; } - @Override public long hash() { return 2; } - }); + @Override public void disconnect_socket() { assert false; } + @Override public boolean eq(SocketDescriptor other_arg) { return other_arg.hash() == 2; } + @Override public long hash() { return 2; } + }); - descriptor1.val = SocketDescriptor.new_impl(new SocketDescriptor.SocketDescriptorInterface() { - @Override - public long send_data(byte[] data, boolean resume_read) { - do_read_event(peer2.peer_manager, descriptor2, data); - return data.length; - } + descriptor1.val = SocketDescriptor.new_impl(new SocketDescriptor.SocketDescriptorInterface() { + @Override + public long send_data(byte[] data, boolean resume_read) { + do_read_event(peer2.peer_manager, descriptor2, data); + return data.length; + } - @Override public void disconnect_socket() { assert false; } - @Override public boolean eq(SocketDescriptor other_arg) { return other_arg.hash() == 1; } - @Override public long hash() { return 1; } - }); + @Override public void disconnect_socket() { assert false; } + @Override public boolean eq(SocketDescriptor other_arg) { return other_arg.hash() == 1; } + @Override public long hash() { return 1; } + }); - Result_CVec_u8ZPeerHandleErrorZ conn_res = peer1.peer_manager.new_outbound_connection(peer2.node_id, descriptor1.val); - assert conn_res instanceof Result_CVec_u8ZPeerHandleErrorZ.Result_CVec_u8ZPeerHandleErrorZ_OK; + Result_CVec_u8ZPeerHandleErrorZ conn_res = peer1.peer_manager.new_outbound_connection(peer2.node_id, descriptor1.val); + assert conn_res instanceof Result_CVec_u8ZPeerHandleErrorZ.Result_CVec_u8ZPeerHandleErrorZ_OK; - Result_NonePeerHandleErrorZ inbound_conn_res = peer2.peer_manager.new_inbound_connection(descriptor2); - assert inbound_conn_res instanceof Result_NonePeerHandleErrorZ.Result_NonePeerHandleErrorZ_OK; - do_read_event(peer2.peer_manager, descriptor2, ((Result_CVec_u8ZPeerHandleErrorZ.Result_CVec_u8ZPeerHandleErrorZ_OK) conn_res).res); + Result_NonePeerHandleErrorZ inbound_conn_res = peer2.peer_manager.new_inbound_connection(descriptor2); + assert inbound_conn_res instanceof Result_NonePeerHandleErrorZ.Result_NonePeerHandleErrorZ_OK; + do_read_event(peer2.peer_manager, descriptor2, ((Result_CVec_u8ZPeerHandleErrorZ.Result_CVec_u8ZPeerHandleErrorZ_OK) conn_res).res); + } } TestState do_test_message_handler() throws InterruptedException { @@ -487,6 +518,10 @@ class HumanObjectPeerTestInstance { wait_events_processed(peer1, peer2); if (reload_peers) { + if (use_nio_peer_handler) { + peer1.nio_peer_handler.interrupt(); + peer2.nio_peer_handler.interrupt(); + } WeakReference op1 = new WeakReference(peer1); peer1 = new Peer(peer1); peer2 = new Peer(peer2); @@ -573,6 +608,11 @@ class HumanObjectPeerTestInstance { byte[][] txn = mon.get_latest_holder_commitment_txn(state.peer2.logger); } } + + if (use_nio_peer_handler) { + state.peer1.nio_peer_handler.interrupt(); + state.peer2.nio_peer_handler.interrupt(); + } } java.util.LinkedList> must_free_objs = new java.util.LinkedList(); @@ -586,14 +626,14 @@ class HumanObjectPeerTestInstance { } } public class HumanObjectPeerTest { - HumanObjectPeerTestInstance do_test_run(boolean nice_close, boolean use_km_wrapper, boolean use_manual_watch, boolean reload_peers, boolean break_cross_peer_refs) throws InterruptedException { - HumanObjectPeerTestInstance instance = new HumanObjectPeerTestInstance(nice_close, use_km_wrapper, use_manual_watch, reload_peers, break_cross_peer_refs); + HumanObjectPeerTestInstance do_test_run(boolean nice_close, boolean use_km_wrapper, boolean use_manual_watch, boolean reload_peers, boolean break_cross_peer_refs, boolean nio_peer_handler) throws InterruptedException { + HumanObjectPeerTestInstance instance = new HumanObjectPeerTestInstance(nice_close, use_km_wrapper, use_manual_watch, reload_peers, break_cross_peer_refs, nio_peer_handler); HumanObjectPeerTestInstance.TestState state = instance.do_test_message_handler(); instance.do_test_message_handler_b(state); return instance; } - void do_test(boolean nice_close, boolean use_km_wrapper, boolean use_manual_watch, boolean reload_peers, boolean break_cross_peer_refs) throws InterruptedException { - HumanObjectPeerTestInstance instance = do_test_run(nice_close, use_km_wrapper, use_manual_watch, reload_peers, break_cross_peer_refs); + void do_test(boolean nice_close, boolean use_km_wrapper, boolean use_manual_watch, boolean reload_peers, boolean break_cross_peer_refs, boolean nio_peer_handler) throws InterruptedException { + HumanObjectPeerTestInstance instance = do_test_run(nice_close, use_km_wrapper, use_manual_watch, reload_peers, break_cross_peer_refs, nio_peer_handler); while (!instance.gc_ran) { System.gc(); System.runFinalization(); @@ -603,18 +643,19 @@ public class HumanObjectPeerTest { } @Test public void test_message_handler() throws InterruptedException { - for (int i = 0; i < (1 << 5) - 1; i++) { + for (int i = 0; i < (1 << 6) - 1; i++) { boolean nice_close = (i & (1 << 0)) != 0; boolean use_km_wrapper = (i & (1 << 1)) != 0; boolean use_manual_watch = (i & (1 << 2)) != 0; boolean reload_peers = (i & (1 << 3)) != 0; boolean break_cross_refs = (i & (1 << 4)) != 0; + boolean nio_peer_handler = (i & (1 << 5)) != 0; if (break_cross_refs && !reload_peers) { // There are no cross refs to break without reloading peers. continue; } System.err.println("Running test with flags " + i); - do_test(nice_close, use_km_wrapper, use_manual_watch, reload_peers, break_cross_refs); + do_test(nice_close, use_km_wrapper, use_manual_watch, reload_peers, break_cross_refs, nio_peer_handler); } } } -- 2.39.5