changeset 53283:afc0109176f8 fibers

NioSocketImpl cleanup and alignment with legacy PlainSocketImpl
author alanb
date Thu, 10 Jan 2019 14:01:14 +0000
parents dc44aabf8a69
children 4740bbb9604c c0b3cd105f73
files src/java.base/share/classes/java/net/ServerSocket.java src/java.base/share/classes/java/net/Socket.java src/java.base/share/classes/sun/nio/ch/NioSocketImpl.java src/java.base/share/classes/sun/nio/ch/ServerSocketChannelImpl.java
diffstat 4 files changed, 134 insertions(+), 50 deletions(-) [+]
line wrap: on
line diff
--- a/src/java.base/share/classes/java/net/ServerSocket.java	Wed Jan 09 20:11:24 2019 +0000
+++ b/src/java.base/share/classes/java/net/ServerSocket.java	Thu Jan 10 14:01:14 2019 +0000
@@ -297,7 +297,7 @@
         } else {
             // No need to do a checkOldImpl() here, we know it's an up to date
             // SocketImpl!
-            impl = new NioSocketImpl();
+            impl = new NioSocketImpl(true);
         }
         if (impl != null)
             impl.setServerSocket(this);
@@ -556,9 +556,6 @@
             si.fd = new FileDescriptor();
             getImpl().accept(si);
 
-            // FIXME: disable Cleaner for now
-            //SocketCleanable.register(si.fd);   // raw fd has been set
-
             SecurityManager security = System.getSecurityManager();
             if (security != null) {
                 security.checkAccept(si.getInetAddress().getHostAddress(),
--- a/src/java.base/share/classes/java/net/Socket.java	Wed Jan 09 20:11:24 2019 +0000
+++ b/src/java.base/share/classes/java/net/Socket.java	Thu Jan 10 14:01:14 2019 +0000
@@ -144,7 +144,7 @@
         } else {
             if (p == Proxy.NO_PROXY) {
                 if (factory == null) {
-                    impl = new NioSocketImpl();
+                    impl = new NioSocketImpl(false);
                     impl.setSocket(this);
                 } else
                     setImpl();
@@ -503,7 +503,7 @@
         } else {
             // No need to do a checkOldImpl() here, we know it's an up to date
             // SocketImpl!
-            impl = new NioSocketImpl();
+            impl = new NioSocketImpl(false);
         }
         if (impl != null)
             impl.setSocket(this);
--- a/src/java.base/share/classes/sun/nio/ch/NioSocketImpl.java	Wed Jan 09 20:11:24 2019 +0000
+++ b/src/java.base/share/classes/sun/nio/ch/NioSocketImpl.java	Thu Jan 10 14:01:14 2019 +0000
@@ -29,6 +29,8 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.VarHandle;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.ProtocolFamily;
@@ -48,7 +50,9 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantLock;
 
+import jdk.internal.access.SharedSecrets;
 import jdk.internal.misc.Strands;
+import jdk.internal.ref.CleanerFactory;
 import sun.net.NetHooks;
 import sun.net.ResourceManager;
 import sun.net.ext.ExtendedSocketOptions;
@@ -63,13 +67,14 @@
  *
  * This implementation attempts to be compatible with legacy PlainSocketImpl,
  * including throwing exceptions that are not specified by SocketImpl.
- *
- * This implementation is not yet integrated with the Cleaner mechanism.
  */
 
 public class NioSocketImpl extends SocketImpl {
     private static final NativeDispatcher nd = new SocketDispatcher();
 
+    // true if this is a SocketImpl for a ServerSocket
+    private final boolean server;
+
     // Lock held when reading or writing
     private final ReentrantLock readLock = new ReentrantLock();
     private final ReentrantLock writeLock = new ReentrantLock();
@@ -84,21 +89,27 @@
     private static final int ST_CLOSED = 5;
     private volatile int state;  // need stateLock to change
 
-    // protected by stateLock
+    // set by SocketImpl.create, protected by stateLock
+    private boolean stream;
+    private FileDescriptorCloser closer;
+
+    // used by connect/read/write/accept, protected by stateLock
     private long readerThread;
     private long writerThread;
-    private int fdVal;
-    private boolean stream;
-    private int timeout;  // read or accept timeout in millis
+
+    // read or accept timeout in millis, protected by stateLock
+    private int timeout;
 
     // flags to indicate if the connection is shutdown for input and output
     private volatile boolean isInputClosed;
     private volatile boolean isOutputClosed;
 
     /**
-     * Creates a instance of this SocketImpl
+     * Creates a instance of this SocketImpl.
      */
-    public NioSocketImpl() { }
+    public NioSocketImpl(boolean server) {
+        this.server = server;
+    }
 
     /**
      * Returns true if the socket is open.
@@ -131,9 +142,10 @@
      * socket is ready for I/O, or asynchronously closed, for up to the
      * specified waiting time, unless the permit is available.
      */
-    private void park(int event, long nanos) throws IOException {
+    private void park(FileDescriptor fd, int event, long nanos) throws IOException {
         Object strand = Strands.currentStrand();
         if (PollerProvider.available() && (strand instanceof Fiber)) {
+            int fdVal = fdVal(fd);
             Poller.register(strand, fdVal, event);
             if (isOpen()) {
                 try {
@@ -169,10 +181,12 @@
      * Marks the beginning of a read operation that might block.
      * @throws SocketException if the socket is closed or not connected
      */
-    private void beginRead() throws SocketException {
+    private FileDescriptor beginRead() throws SocketException {
         synchronized (stateLock) {
             ensureOpenAndConnected();
             readerThread = NativeThread.current();
+            assert fd != null;
+            return fd;
         }
     }
 
@@ -200,7 +214,7 @@
         readLock.lock();
         try {
             int n = 0;
-            beginRead();
+            FileDescriptor fd = beginRead();
             try {
                 if (isInputClosed)
                     return IOStatus.EOF;
@@ -211,7 +225,7 @@
                         // read with timeout
                         do {
                             long startTime = System.nanoTime();
-                            park(Net.POLLIN, nanos);
+                            park(fd, Net.POLLIN, nanos);
                             n = IOUtil.read(fd, dst, -1, nd);
                             if (n == IOStatus.UNAVAILABLE) {
                                 nanos -= System.nanoTime() - startTime;
@@ -222,7 +236,7 @@
                     } else {
                         // read, no timeout
                         do {
-                            park(Net.POLLIN, 0);
+                            park(fd, Net.POLLIN, 0);
                             n = IOUtil.read(fd, dst, -1, nd);
                         } while (n == IOStatus.UNAVAILABLE && isOpen());
                     }
@@ -240,10 +254,12 @@
      * Marks the beginning of a write operation that might block.
      * @throws SocketException if the socket is closed or not connected
      */
-    private void beginWrite() throws SocketException {
+    private FileDescriptor beginWrite() throws SocketException {
         synchronized (stateLock) {
             ensureOpenAndConnected();
             writerThread = NativeThread.current();
+            assert fd != null;
+            return fd;
         }
     }
 
@@ -270,11 +286,11 @@
         writeLock.lock();
         try {
             int n = 0;
-            beginWrite();
+            FileDescriptor fd = beginWrite();
             try {
                 n = IOUtil.write(fd, dst, -1, nd);
                 while (n == IOStatus.UNAVAILABLE && isOpen()) {
-                    park(Net.POLLOUT, 0);
+                    park(fd, Net.POLLOUT, 0);
                     n = IOUtil.write(fd, dst, -1, nd);
                 }
                 return n;
@@ -298,7 +314,12 @@
                 ResourceManager.beforeUdpCreate();
             FileDescriptor fd = null;
             try {
-                fd = Net.socket(stream);
+                if (server) {
+                    assert stream;
+                    fd = Net.serverSocket(true);
+                } else {
+                    fd = Net.socket(stream);
+                }
                 IOUtil.configureBlocking(fd, false);
             } catch (IOException ioe) {
                 if (!stream)
@@ -308,8 +329,8 @@
                 throw ioe;
             }
             this.fd = fd;
-            this.fdVal = IOUtil.fdVal(fd);
             this.stream = stream;
+            this.closer = FileDescriptorCloser.create(this);
             this.state = ST_UNCONNECTED;
         }
     }
@@ -318,13 +339,16 @@
      * Marks the beginning of a connect operation that might block.
      * @throws SocketException if the socket is closed or already connected
      */
-    private void beginConnect(InetAddress address, int port) throws IOException {
+    private FileDescriptor beginConnect(InetAddress address, int port)
+        throws IOException
+    {
         synchronized (stateLock) {
             int state = this.state;
             if (state >= ST_CLOSING)
                 throw new SocketException("Socket closed");
-            if (state != ST_UNCONNECTED)
+            if (state == ST_CONNECTED)
                 throw new SocketException("Already connected");
+            assert state == ST_UNCONNECTED;
             this.state = ST_CONNECTING;
 
             // invoke beforeTcpConnect hook if not already bound
@@ -337,6 +361,8 @@
             this.port = port;
 
             readerThread = NativeThread.current();
+            assert fd != null;
+            return fd;
         }
     }
 
@@ -385,7 +411,7 @@
                 try {
                     boolean connected = false;
                     try {
-                        beginConnect(address, port);
+                        FileDescriptor fd = beginConnect(address, port);
                         int n = Net.connect(fd, address, port);
                         if (n == IOStatus.UNAVAILABLE && isOpen()) {
                             long nanos = NANOSECONDS.convert(millis, MILLISECONDS);
@@ -393,7 +419,7 @@
                                 // connect with timeout
                                 do {
                                     long startTime = System.nanoTime();
-                                    park(Net.POLLOUT, nanos);
+                                    park(fd, Net.POLLOUT, nanos);
                                     n = SocketChannelImpl.checkConnect(fd, false);
                                     if (n == IOStatus.UNAVAILABLE) {
                                         nanos -= System.nanoTime() - startTime;
@@ -404,7 +430,7 @@
                             } else {
                                 // connect, no timeout
                                 do {
-                                    park(Net.POLLOUT, 0);
+                                    park(fd, Net.POLLOUT, 0);
                                     n = SocketChannelImpl.checkConnect(fd, false);
                                 } while (n == IOStatus.UNAVAILABLE && isOpen());
                             }
@@ -450,7 +476,7 @@
             Net.bind(fd, host, port);
             InetSocketAddress localAddress = Net.localAddress(fd);
 
-            address = localAddress.getAddress(); // needed by ServerSocket
+            address = localAddress.getAddress();
             localport = localAddress.getPort();
         }
     }
@@ -469,12 +495,16 @@
      * Marks the beginning of an accept operation that might block.
      * @throws SocketException if the socket is closed
      */
-    private void beginAccept() throws  SocketException {
+    private FileDescriptor beginAccept() throws  SocketException {
         synchronized (stateLock) {
             ensureOpen();
+            if (!stream)
+                throw new SocketException("Not a stream socket");
             if (localport == 0)
                 throw new SocketException("Not bound");
             readerThread = NativeThread.current();
+            assert fd != null;
+            return fd;
         }
     }
 
@@ -505,16 +535,16 @@
 
             int n = 0;
             try {
-                beginAccept();
-                n = ServerSocketChannelImpl.accept0(this.fd, si.fd, isaa);
+                FileDescriptor fd = beginAccept();
+                n = ServerSocketChannelImpl.accept0(fd, si.fd, isaa);
                 if (n == IOStatus.UNAVAILABLE && isOpen()) {
                     long nanos = NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS);
                     if (nanos > 0) {
                         // accept with timeout
                         do {
                             long startTime = System.nanoTime();
-                            park(Net.POLLIN, nanos);
-                            n = ServerSocketChannelImpl.accept0(this.fd, si.fd, isaa);
+                            park(fd, Net.POLLIN, nanos);
+                            n = ServerSocketChannelImpl.accept0(fd, si.fd, isaa);
                             if (n == IOStatus.UNAVAILABLE) {
                                 nanos -= System.nanoTime() - startTime;
                                 if (nanos <= 0)
@@ -524,8 +554,8 @@
                     } else {
                         // accept, no timeout
                         do {
-                            park(Net.POLLIN, 0);
-                            n = ServerSocketChannelImpl.accept0(this.fd, si.fd, isaa);
+                            park(fd, Net.POLLIN, 0);
+                            n = ServerSocketChannelImpl.accept0(fd, si.fd, isaa);
                         } while (n == IOStatus.UNAVAILABLE && isOpen());
                     }
                 }
@@ -536,14 +566,15 @@
 
             // set fields in SocketImpl
             synchronized (si.stateLock) {
-                assert si.state == ST_NEW;
+
                 try {
                     IOUtil.configureBlocking(si.fd, false);
                 } catch (IOException ioe) {
                     nd.close(si.fd);
                     throw ioe;
                 }
-                si.fdVal = IOUtil.fdVal(si.fd);
+                si.stream = true;
+                si.closer = FileDescriptorCloser.create(si);
                 si.localport = Net.localAddress(si.fd).getPort();
                 si.address = isaa[0].getAddress();
                 si.port = isaa[0].getPort();
@@ -647,7 +678,7 @@
             // unpark and wait for fibers to complete I/O operations
             if (NativeThread.isFiber(readerThread) ||
                     NativeThread.isFiber(writerThread)) {
-                Poller.stopPoll(fdVal);
+                Poller.stopPoll(fdVal(fd));
 
                 while (NativeThread.isFiber(readerThread) ||
                         NativeThread.isFiber(writerThread)) {
@@ -682,12 +713,11 @@
                 }
             }
 
-            state = ST_CLOSED;
+            // close file descriptor
             try {
-                nd.close(fd);
+                closer.run();
             } finally {
-                if (!stream)
-                    ResourceManager.afterUdpClose();
+                state = ST_CLOSED;
             }
         }
 
@@ -882,12 +912,12 @@
     @Override
     protected void shutdownInput() throws IOException {
         synchronized (stateLock) {
-            ensureOpen();
+            ensureOpenAndConnected();
             if (!isInputClosed) {
                 Net.shutdown(fd, Net.SHUT_RD);
                 long reader = readerThread;
                 if (NativeThread.isFiber(reader)) {
-                    Poller.stopPoll(fdVal, Net.POLLIN);
+                    Poller.stopPoll(fdVal(fd), Net.POLLIN);
                 } else if (NativeThread.isKernelThread(reader)) {
                     NativeThread.signal(reader);
                 }
@@ -899,12 +929,12 @@
     @Override
     protected void shutdownOutput() throws IOException {
         synchronized (stateLock) {
-            ensureOpen();
+            ensureOpenAndConnected();
             if (!isOutputClosed) {
                 Net.shutdown(fd, Net.SHUT_WR);
                 long writer = writerThread;
                 if (NativeThread.isFiber(writer)) {
-                    Poller.stopPoll(fdVal, Net.POLLOUT);
+                    Poller.stopPoll(fdVal(fd), Net.POLLOUT);
                 } else if (NativeThread.isKernelThread(writer)) {
                     NativeThread.signal(writer);
                 }
@@ -924,7 +954,7 @@
         try {
             int n = 0;
             try {
-                beginWrite();
+                FileDescriptor fd = beginWrite();
                 n = SocketChannelImpl.sendOutOfBandData(fd, (byte) data);
                 if (n == IOStatus.UNAVAILABLE) {
                     throw new RuntimeException("not implemented");
@@ -936,4 +966,61 @@
             writeLock.unlock();
         }
     }
+
+    /**
+     * A task that closes a SocketImpl's file descriptor. The task runs when the
+     * SocketImpl is explicitly closed and when the SocketImpl becomes phantom
+     * reachable.
+     */
+    private static class FileDescriptorCloser implements Runnable {
+        private static final VarHandle CLOSED;
+        static {
+            try {
+                MethodHandles.Lookup l = MethodHandles.lookup();
+                CLOSED = l.findVarHandle(FileDescriptorCloser.class,
+                                         "closed",
+                                         boolean.class);
+            } catch (Exception e) {
+                throw new InternalError(e);
+            }
+        }
+        
+        private final FileDescriptor fd;
+        private final boolean stream;
+        private volatile boolean closed;
+
+        FileDescriptorCloser(FileDescriptor fd, boolean stream) {
+            this.fd = fd;
+            this.stream = stream;
+        }
+
+        static FileDescriptorCloser create(NioSocketImpl impl) {
+            assert Thread.holdsLock(impl.stateLock);
+            var closer = new FileDescriptorCloser(impl.fd, impl.stream);
+            CleanerFactory.cleaner().register(impl, closer);
+            return closer;
+        }
+        
+        @Override
+        public void run() {
+            if (CLOSED.compareAndSet(this, false, true)) {
+                try {
+                    nd.close(fd);
+                } catch (IOException ioe) {
+                    throw new RuntimeException(ioe);
+                } finally {
+                    if (!stream) {
+                        // decrement 
+                        ResourceManager.afterUdpClose();
+                    }
+                }
+            }
+        }
+    }
+
+    private static int fdVal(FileDescriptor fd) {
+        int fdVal = SharedSecrets.getJavaIOFileDescriptorAccess().get(fd);
+        assert fdVal == IOUtil.fdVal(fd);
+        return fdVal;
+    }
 }
--- a/src/java.base/share/classes/sun/nio/ch/ServerSocketChannelImpl.java	Wed Jan 09 20:11:24 2019 +0000
+++ b/src/java.base/share/classes/sun/nio/ch/ServerSocketChannelImpl.java	Thu Jan 10 14:01:14 2019 +0000
@@ -101,7 +101,7 @@
     ServerSocketChannelImpl(SelectorProvider sp) throws IOException {
         super(sp);
 
-        FileDescriptor fd = Net.socket(true);
+        FileDescriptor fd = Net.serverSocket(true);
         try {
             IOUtil.configureBlocking(fd, false);
         } catch (IOException ioe) {