X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Forg%2Fldk%2Fbatteries%2FNioPeerHandler.java;h=2d38726450b435c52c9ec63f07383f593c9a1e30;hb=1ef2dc8b8a3e0e244ad82ac08e286df489eca4b7;hp=e0dd83e9edc570b542716ae88caacb4e45b58940;hpb=a937cedf387378f24883ab14c5b52a4d13be58bc;p=ldk-java diff --git a/src/main/java/org/ldk/batteries/NioPeerHandler.java b/src/main/java/org/ldk/batteries/NioPeerHandler.java index e0dd83e9..2d387264 100644 --- a/src/main/java/org/ldk/batteries/NioPeerHandler.java +++ b/src/main/java/org/ldk/batteries/NioPeerHandler.java @@ -6,9 +6,8 @@ import org.ldk.structs.*; import java.io.IOException; import java.lang.reflect.Field; import java.lang.ref.Reference; +import java.net.*; import java.util.LinkedList; -import java.net.SocketAddress; -import java.net.StandardSocketOptions; import java.nio.Buffer; import java.nio.ByteBuffer; import java.nio.channels.*; @@ -87,7 +86,7 @@ public class NioPeerHandler { (peer.key.interestOps() | SelectionKey.OP_READ) & (~SelectionKey.OP_WRITE))); } return written; - } catch (IOException e) { + } catch (IOException|CancelledKeyException ignored) { // Most likely the socket is disconnected, let the background thread handle it. return 0; } @@ -97,7 +96,7 @@ public class NioPeerHandler { public void disconnect_socket() { try { do_selector_action(() -> { - peer.key.cancel(); + try { peer.key.cancel(); } catch (CancelledKeyException ignored) {} peer.key.channel().close(); }); } catch (IOException ignored) { } @@ -121,6 +120,19 @@ public class NioPeerHandler { long socket_id; volatile boolean shutdown = false; + private static Option_NetAddressZ get_netaddr_from_sockaddr(SocketAddress sockaddr) { + if (sockaddr instanceof InetSocketAddress) { + InetAddress addr = ((InetSocketAddress) sockaddr).getAddress(); + short port = (short) ((InetSocketAddress) sockaddr).getPort(); + if (addr instanceof Inet4Address) { + return Option_NetAddressZ.some(NetAddress.ipv4(addr.getAddress(), port)); + } else if (addr instanceof Inet6Address) { + return Option_NetAddressZ.some(NetAddress.ipv6(addr.getAddress(), port)); + } + } + return Option_NetAddressZ.none(); + } + /** * Constructs a new peer handler, spawning a thread to monitor for socket events. * @@ -176,7 +188,8 @@ public class NioPeerHandler { try { Peer peer = setup_socket(chan); peer.key = chan.register(this.selector, SelectionKey.OP_READ, peer); - Result_NonePeerHandleErrorZ res = this.peer_manager.new_inbound_connection(peer.descriptor); + Option_NetAddressZ netaddr = get_netaddr_from_sockaddr(chan.getRemoteAddress()); + Result_NonePeerHandleErrorZ res = this.peer_manager.new_inbound_connection(peer.descriptor, netaddr); if (res instanceof Result_NonePeerHandleErrorZ.Result_NonePeerHandleErrorZ_Err) { peer.descriptor.disconnect_socket(); } @@ -189,8 +202,8 @@ public class NioPeerHandler { if (key.isValid() && (key.interestOps() & SelectionKey.OP_WRITE) != 0 && key.isWritable()) { Result_NonePeerHandleErrorZ res = this.peer_manager.write_buffer_space_avail(peer.descriptor); if (res instanceof Result_NonePeerHandleErrorZ.Result_NonePeerHandleErrorZ_Err) { - key.channel().close(); key.cancel(); + key.channel().close(); } } if (key.isValid() && (key.interestOps() & SelectionKey.OP_READ) != 0 && key.isReadable()) { @@ -199,6 +212,7 @@ public class NioPeerHandler { if (read == -1) { this.peer_manager.socket_disconnected(peer.descriptor); key.cancel(); + key.channel().close(); // This may throw, we read -1 so the channel should already be closed, but do this to be safe } else if (read > 0) { ((Buffer)buf).flip(); // This code is quite hot during initial network graph sync, so we go a ways out of @@ -223,15 +237,15 @@ public class NioPeerHandler { key.interestOps(key.interestOps() & (~SelectionKey.OP_READ)); } } else { - key.channel().close(); key.cancel(); + key.channel().close(); } bindings.CResult_boolPeerHandleErrorZ_free(read_result_pointer); } } } catch (IOException ignored) { - try { key.channel().close(); } catch (IOException ignored2) { } key.cancel(); + try { key.channel().close(); } catch (IOException ignored2) { } peer_manager.socket_disconnected(peer.descriptor); } } catch (CancelledKeyException e) { @@ -265,18 +279,26 @@ public class NioPeerHandler { */ public void connect(byte[] their_node_id, SocketAddress remote, int timeout_ms) throws IOException { SocketChannel chan = SocketChannel.open(); - chan.configureBlocking(false); - Selector open_selector = Selector.open(); - chan.register(open_selector, SelectionKey.OP_CONNECT); - if (!chan.connect(remote)) { - open_selector.select(timeout_ms); + boolean connected; + try { + chan.configureBlocking(false); + Selector open_selector = Selector.open(); + chan.register(open_selector, SelectionKey.OP_CONNECT); + if (!chan.connect(remote)) { + open_selector.select(timeout_ms); + } + connected = chan.finishConnect(); + } catch (IOException e) { + try { chan.close(); } catch (IOException _e) { } + throw e; } - if (!chan.finishConnect()) { // Note that this may throw its own IOException if we failed for another reason + if (!connected) { + try { chan.close(); } catch (IOException _e) { } throw new IOException("Timed out"); } Peer peer = setup_socket(chan); do_selector_action(() -> peer.key = chan.register(this.selector, SelectionKey.OP_READ, peer)); - Result_CVec_u8ZPeerHandleErrorZ res = this.peer_manager.new_outbound_connection(their_node_id, peer.descriptor); + Result_CVec_u8ZPeerHandleErrorZ res = this.peer_manager.new_outbound_connection(their_node_id, peer.descriptor, get_netaddr_from_sockaddr(remote)); if (res instanceof Result_CVec_u8ZPeerHandleErrorZ.Result_CVec_u8ZPeerHandleErrorZ_OK) { byte[] initial_bytes = ((Result_CVec_u8ZPeerHandleErrorZ.Result_CVec_u8ZPeerHandleErrorZ_OK) res).res; if (chan.write(ByteBuffer.wrap(initial_bytes)) != initial_bytes.length) { @@ -323,10 +345,12 @@ public class NioPeerHandler { } /** - * Interrupt the background thread, stopping all peer handling. Disconnection events to the PeerHandler are not made, - * potentially leaving the PeerHandler in an inconsistent state. + * Interrupt the background thread, stopping all peer handling. + * + * After this method is called, the behavior of future calls to methods on this NioPeerHandler are undefined. */ public void interrupt() { + this.peer_manager.disconnect_all_peers(); shutdown = true; selector.wakeup(); try { @@ -338,6 +362,7 @@ public class NioPeerHandler { for (ServerSocketChannel chan : listening_sockets) { chan.close(); } + listening_sockets.clear(); } catch (IOException ignored) {} } Reference.reachabilityFence(this.peer_manager); // Almost certainly overkill, but no harm in it