X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Forg%2Fldk%2Fbatteries%2FNioPeerHandler.java;h=2d38726450b435c52c9ec63f07383f593c9a1e30;hb=1ef2dc8b8a3e0e244ad82ac08e286df489eca4b7;hp=bf111729115492ba7e3c94c26645290b000794d3;hpb=e6173a79ce766cef7a660f22fc8fd1220975f6ea;p=ldk-java diff --git a/src/main/java/org/ldk/batteries/NioPeerHandler.java b/src/main/java/org/ldk/batteries/NioPeerHandler.java index bf111729..2d387264 100644 --- a/src/main/java/org/ldk/batteries/NioPeerHandler.java +++ b/src/main/java/org/ldk/batteries/NioPeerHandler.java @@ -5,9 +5,9 @@ import org.ldk.structs.*; import java.io.IOException; import java.lang.reflect.Field; +import java.lang.ref.Reference; +import java.net.*; import java.util.LinkedList; -import java.net.SocketAddress; -import java.net.StandardSocketOptions; import java.nio.Buffer; import java.nio.ByteBuffer; import java.nio.channels.*; @@ -86,7 +86,7 @@ public class NioPeerHandler { (peer.key.interestOps() | SelectionKey.OP_READ) & (~SelectionKey.OP_WRITE))); } return written; - } catch (IOException e) { + } catch (IOException|CancelledKeyException ignored) { // Most likely the socket is disconnected, let the background thread handle it. return 0; } @@ -96,7 +96,7 @@ public class NioPeerHandler { public void disconnect_socket() { try { do_selector_action(() -> { - peer.key.cancel(); + try { peer.key.cancel(); } catch (CancelledKeyException ignored) {} peer.key.channel().close(); }); } catch (IOException ignored) { } @@ -120,6 +120,19 @@ public class NioPeerHandler { long socket_id; volatile boolean shutdown = false; + private static Option_NetAddressZ get_netaddr_from_sockaddr(SocketAddress sockaddr) { + if (sockaddr instanceof InetSocketAddress) { + InetAddress addr = ((InetSocketAddress) sockaddr).getAddress(); + short port = (short) ((InetSocketAddress) sockaddr).getPort(); + if (addr instanceof Inet4Address) { + return Option_NetAddressZ.some(NetAddress.ipv4(addr.getAddress(), port)); + } else if (addr instanceof Inet6Address) { + return Option_NetAddressZ.some(NetAddress.ipv6(addr.getAddress(), port)); + } + } + return Option_NetAddressZ.none(); + } + /** * Constructs a new peer handler, spawning a thread to monitor for socket events. * @@ -174,10 +187,12 @@ public class NioPeerHandler { if (chan == null) continue; try { Peer peer = setup_socket(chan); - Result_NonePeerHandleErrorZ res = this.peer_manager.new_inbound_connection(peer.descriptor); - if (res instanceof Result_NonePeerHandleErrorZ.Result_NonePeerHandleErrorZ_OK) { - peer.key = chan.register(this.selector, SelectionKey.OP_READ, peer); - } + peer.key = chan.register(this.selector, SelectionKey.OP_READ, peer); + Option_NetAddressZ netaddr = get_netaddr_from_sockaddr(chan.getRemoteAddress()); + Result_NonePeerHandleErrorZ res = this.peer_manager.new_inbound_connection(peer.descriptor, netaddr); + if (res instanceof Result_NonePeerHandleErrorZ.Result_NonePeerHandleErrorZ_Err) { + peer.descriptor.disconnect_socket(); + } } catch (IOException ignored) { } } continue; // There is no attachment so the rest of the loop is useless @@ -187,8 +202,8 @@ public class NioPeerHandler { if (key.isValid() && (key.interestOps() & SelectionKey.OP_WRITE) != 0 && key.isWritable()) { Result_NonePeerHandleErrorZ res = this.peer_manager.write_buffer_space_avail(peer.descriptor); if (res instanceof Result_NonePeerHandleErrorZ.Result_NonePeerHandleErrorZ_Err) { - key.channel().close(); key.cancel(); + key.channel().close(); } } if (key.isValid() && (key.interestOps() & SelectionKey.OP_READ) != 0 && key.isReadable()) { @@ -197,6 +212,7 @@ public class NioPeerHandler { if (read == -1) { this.peer_manager.socket_disconnected(peer.descriptor); key.cancel(); + key.channel().close(); // This may throw, we read -1 so the channel should already be closed, but do this to be safe } else if (read > 0) { ((Buffer)buf).flip(); // This code is quite hot during initial network graph sync, so we go a ways out of @@ -216,20 +232,20 @@ public class NioPeerHandler { buf.get(read_bytes, 0, read); long read_result_pointer = bindings.PeerManager_read_event( peer_manager_raw_pointer, peer.descriptor_raw_pointer, read_bytes); - if (bindings.LDKCResult_boolPeerHandleErrorZ_result_ok(read_result_pointer)) { - if (bindings.LDKCResult_boolPeerHandleErrorZ_get_ok(read_result_pointer)) { + if (bindings.CResult_boolPeerHandleErrorZ_is_ok(read_result_pointer)) { + if (bindings.CResult_boolPeerHandleErrorZ_get_ok(read_result_pointer)) { key.interestOps(key.interestOps() & (~SelectionKey.OP_READ)); } } else { - key.channel().close(); key.cancel(); + key.channel().close(); } bindings.CResult_boolPeerHandleErrorZ_free(read_result_pointer); } } } catch (IOException ignored) { - try { key.channel().close(); } catch (IOException ignored2) { } key.cancel(); + try { key.channel().close(); } catch (IOException ignored2) { } peer_manager.socket_disconnected(peer.descriptor); } } catch (CancelledKeyException e) { @@ -263,24 +279,35 @@ public class NioPeerHandler { */ public void connect(byte[] their_node_id, SocketAddress remote, int timeout_ms) throws IOException { SocketChannel chan = SocketChannel.open(); - chan.configureBlocking(false); - Selector open_selector = Selector.open(); - chan.register(open_selector, SelectionKey.OP_CONNECT); - if (!chan.connect(remote)) { - open_selector.select(timeout_ms); + boolean connected; + try { + chan.configureBlocking(false); + Selector open_selector = Selector.open(); + chan.register(open_selector, SelectionKey.OP_CONNECT); + if (!chan.connect(remote)) { + open_selector.select(timeout_ms); + } + connected = chan.finishConnect(); + } catch (IOException e) { + try { chan.close(); } catch (IOException _e) { } + throw e; } - if (!chan.finishConnect()) { // Note that this may throw its own IOException if we failed for another reason + if (!connected) { + try { chan.close(); } catch (IOException _e) { } throw new IOException("Timed out"); } Peer peer = setup_socket(chan); - Result_CVec_u8ZPeerHandleErrorZ res = this.peer_manager.new_outbound_connection(their_node_id, peer.descriptor); + do_selector_action(() -> peer.key = chan.register(this.selector, SelectionKey.OP_READ, peer)); + Result_CVec_u8ZPeerHandleErrorZ res = this.peer_manager.new_outbound_connection(their_node_id, peer.descriptor, get_netaddr_from_sockaddr(remote)); if (res instanceof Result_CVec_u8ZPeerHandleErrorZ.Result_CVec_u8ZPeerHandleErrorZ_OK) { byte[] initial_bytes = ((Result_CVec_u8ZPeerHandleErrorZ.Result_CVec_u8ZPeerHandleErrorZ_OK) res).res; if (chan.write(ByteBuffer.wrap(initial_bytes)) != initial_bytes.length) { + peer.descriptor.disconnect_socket(); + this.peer_manager.socket_disconnected(peer.descriptor); throw new IOException("We assume TCP socket buffer is at least a single packet in length"); } - do_selector_action(() -> peer.key = chan.register(this.selector, SelectionKey.OP_READ, peer)); } else { + peer.descriptor.disconnect_socket(); throw new IOException("LDK rejected outbound connection. This likely shouldn't ever happen."); } } @@ -318,10 +345,12 @@ public class NioPeerHandler { } /** - * Interrupt the background thread, stopping all peer handling. Disconnection events to the PeerHandler are not made, - * potentially leaving the PeerHandler in an inconsistent state. + * Interrupt the background thread, stopping all peer handling. + * + * After this method is called, the behavior of future calls to methods on this NioPeerHandler are undefined. */ public void interrupt() { + this.peer_manager.disconnect_all_peers(); shutdown = true; selector.wakeup(); try { @@ -333,8 +362,10 @@ public class NioPeerHandler { for (ServerSocketChannel chan : listening_sockets) { chan.close(); } + listening_sockets.clear(); } catch (IOException ignored) {} } + Reference.reachabilityFence(this.peer_manager); // Almost certainly overkill, but no harm in it } /**