changeset 53706:0b36729560c6 fibers

Sync up NioSocket with changes from sandbox
author alanb
date Thu, 31 Jan 2019 20:41:26 +0000
parents db2db43cceef
children da03295e6bf1
files src/java.base/share/classes/sun/nio/ch/NioSocketImpl.java
diffstat 1 files changed, 81 insertions(+), 53 deletions(-) [+]
line wrap: on
line diff
--- a/src/java.base/share/classes/sun/nio/ch/NioSocketImpl.java	Wed Jan 30 16:10:42 2019 +0000
+++ b/src/java.base/share/classes/sun/nio/ch/NioSocketImpl.java	Thu Jan 31 20:41:26 2019 +0000
@@ -170,7 +170,7 @@
      * specified waiting time.
      * @throws IOException if an I/O error occurs of the fiber is cancelled
      */
-    private void park(int event, long nanos) throws IOException {
+    private void park(FileDescriptor fd, int event, long nanos) throws IOException {
         Object strand = Strands.currentStrand();
         if (PollerProvider.available() && (strand instanceof Fiber)) {
             int fdVal = fdVal(fd);
@@ -210,8 +210,8 @@
      * ready for I/O or is asynchronously closed.
      * @throws IOException if an I/O error occurs
      */
-    private void park(int event) throws IOException {
-        park(event, 0);
+    private void park(FileDescriptor fd, int event) throws IOException {
+        park(fd, event, 0);
     }
 
     /**
@@ -219,12 +219,12 @@
      * strand is a fiber or a timeout is specified.
      * @throws IOException if there is an I/O error changing the blocking mode
      */
-    private void maybeConfigureNonBlocking(FileDescriptor fd, int timeout)
+    private void configureNonBlockingIfNeeded(FileDescriptor fd, int timeout)
         throws IOException
     {
-        assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
         if (!nonBlocking
-                && (timeout > 0 || Strands.currentStrand() instanceof Fiber)) {
+            && (timeout > 0 || Strands.currentStrand() instanceof Fiber)) {
+            assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
             IOUtil.configureBlocking(fd, false);
             nonBlocking = true;
         }
@@ -238,7 +238,6 @@
         synchronized (stateLock) {
             ensureOpenAndConnected();
             readerThread = NativeThread.current();
-            assert fd != null;
             return fd;
         }
     }
@@ -259,31 +258,49 @@
     }
 
     /**
-     * Reads bytes from the socket into the given buffer.
+     * Try to read bytes from the socket into the given byte array.
+     */
+    private int tryRead(FileDescriptor fd, byte[] b, int off, int len)
+        throws IOException
+    {
+        ByteBuffer dst = Util.getTemporaryDirectBuffer(len);
+        assert dst.position() == 0;
+        try {
+            int n = nd.read(fd, ((DirectBuffer)dst).address(), len);
+            if (n > 0) {
+                dst.get(b, off, n);
+            }
+            return n;
+        } finally{
+            Util.offerFirstTemporaryDirectBuffer(dst);
+        }
+    }
+
+    /**
+     * Reads bytes from the socket into the given byte array.
+     * @return the number of bytes read
      * @throws IOException if the socket is closed or an I/O occurs
      * @throws SocketTimeoutException if the read timeout elapses
      */
-    private int read(ByteBuffer dst) throws IOException {
+    private int read(byte[] b, int off, int len) throws IOException {
         readLock.lock();
         try {
+            int timeout = this.timeout;
             int n = 0;
             FileDescriptor fd = beginRead();
             try {
-                if (isInputClosed) {
+                if (isInputClosed)
                     return IOStatus.EOF;
-                }
-                int timeout = this.timeout;
-                maybeConfigureNonBlocking(fd, timeout);
-                n = IOUtil.read(fd, dst, -1, nd);
+                configureNonBlockingIfNeeded(fd, timeout);
+                n = tryRead(fd, b, off, len);
                 if (IOStatus.okayToRetry(n) && isOpen()) {
                     if (timeout > 0) {
                         // read with timeout
-                        assert nonBlocking;
                         long nanos = NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS);
                         do {
                             long startTime = System.nanoTime();
-                            park(Net.POLLIN, nanos);
-                            n = IOUtil.read(fd, dst, -1, nd);
+                            park(fd, Net.POLLIN, nanos);
+                            n = tryRead(fd, b, off, len);
                             if (n == IOStatus.UNAVAILABLE) {
                                 nanos -= System.nanoTime() - startTime;
                                 if (nanos <= 0)
@@ -293,8 +310,8 @@
                     } else {
                         // read, no timeout
                         do {
-                            park(Net.POLLIN);
-                            n = IOUtil.read(fd, dst, -1, nd);
+                            park(fd, Net.POLLIN);
+                            n = tryRead(fd, b, off, len);
                         } while (IOStatus.okayToRetry(n) && isOpen());
                     }
                 }
@@ -315,7 +332,6 @@
         synchronized (stateLock) {
             ensureOpenAndConnected();
             writerThread = NativeThread.current();
-            assert fd != null;
             return fd;
         }
     }
@@ -336,20 +352,37 @@
     }
 
     /**
-     * Writes a sequence of bytes to this socket from the given buffer.
+     * Try to write a sequence of bytes to this socket from the given byte array
+     */
+    private int tryWrite(FileDescriptor fd, byte[] b, int off, int len)
+        throws IOException
+    {
+        ByteBuffer src = Util.getTemporaryDirectBuffer(len);
+        assert src.position() == 0;
+        try {
+            src.put(b, off, len);
+            return nd.write(fd, ((DirectBuffer)src).address(), len);
+        } finally {
+            Util.offerFirstTemporaryDirectBuffer(src);
+        }
+    }
+
+    /**
+     * Writes a sequence of bytes to this socket from the given byte array.
+     * @return the number of bytes written
      * @throws IOException if the socket is closed or an I/O occurs
      */
-    private int write(ByteBuffer dst) throws IOException {
+    private int write(byte[] b, int off, int len) throws IOException {
         writeLock.lock();
         try {
             int n = 0;
             FileDescriptor fd = beginWrite();
             try {
-                maybeConfigureNonBlocking(fd, 0);
-                n = IOUtil.write(fd, dst, -1, nd);
+                configureNonBlockingIfNeeded(fd, 0);
+                n = tryWrite(fd, b, off, len);
                 while (IOStatus.okayToRetry(n) && isOpen()) {
-                    park(Net.POLLOUT);
-                    n = IOUtil.write(fd, dst, -1, nd);
+                    park(fd, Net.POLLOUT);
+                    n = tryWrite(fd, b, off, len);
                 }
                 return n;
             } finally {
@@ -490,7 +523,6 @@
             this.port = port;
 
             readerThread = NativeThread.current();
-            assert fd != null;
             return fd;
         }
     }
@@ -540,7 +572,7 @@
                 boolean connected = false;
                 FileDescriptor fd = beginConnect(address, port);
                 try {
-                    maybeConfigureNonBlocking(fd, millis);
+                    configureNonBlockingIfNeeded(fd, millis);
                     int n = Net.connect(fd, address, port);
                     if (IOStatus.okayToRetry(n) && isOpen()) {
                         if (millis > 0) {
@@ -549,7 +581,7 @@
                             long nanos = NANOSECONDS.convert(millis, MILLISECONDS);
                             do {
                                 long startTime = System.nanoTime();
-                                park(Net.POLLOUT, nanos);
+                                park(fd, Net.POLLOUT, nanos);
                                 n = Net.pollConnectNow(fd);
                                 if (n == 0) {
                                     nanos -= System.nanoTime() - startTime;
@@ -560,7 +592,7 @@
                         } else {
                             // connect, no timeout
                             do {
-                                park(Net.POLLOUT);
+                                park(fd, Net.POLLOUT);
                                 n = Net.pollConnectNow(fd);
                             } while (n == 0 && isOpen());
                         }
@@ -631,7 +663,6 @@
             if (localport == 0)
                 throw new SocketException("Not bound");
             readerThread = NativeThread.current();
-            assert fd != null;
             return fd;
         }
     }
@@ -664,7 +695,7 @@
             FileDescriptor fd = beginAccept();
             try {
                 int timeout = this.timeout;
-                maybeConfigureNonBlocking(fd, timeout);
+                configureNonBlockingIfNeeded(fd, timeout);
                 n = ServerSocketChannelImpl.accept0(fd, newfd, isaa);
                 if (IOStatus.okayToRetry(n) && isOpen()) {
                     if (timeout > 0) {
@@ -673,7 +704,7 @@
                         long nanos = NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS);
                         do {
                             long startTime = System.nanoTime();
-                            park(Net.POLLIN, nanos);
+                            park(fd, Net.POLLIN, nanos);
                             n = ServerSocketChannelImpl.accept0(fd, newfd, isaa);
                             if (n == IOStatus.UNAVAILABLE) {
                                 nanos -= System.nanoTime() - startTime;
@@ -684,7 +715,7 @@
                     } else {
                         // accept, no timeout
                         do {
-                            park(Net.POLLIN);
+                            park(fd, Net.POLLIN);
                             n = ServerSocketChannelImpl.accept0(fd, newfd, isaa);
                         } while (IOStatus.okayToRetry(n) && isOpen());
                     }
@@ -732,7 +763,7 @@
     @Override
     protected InputStream getInputStream() {
         return new InputStream() {
-            private volatile boolean eof;
+            private volatile boolean eof;  // to emulate legacy SocketInputStream
             @Override
             public int read() throws IOException {
                 byte[] a = new byte[1];
@@ -740,18 +771,17 @@
                 return (n > 0) ? (a[0] & 0xff) : -1;
             }
             @Override
-            public int read(byte b[], int off, int len) throws IOException {
+            public int read(byte[] b, int off, int len) throws IOException {
                 Objects.checkFromIndexSize(off, len, b.length);
                 if (eof) {
-                    return -1; // legacy SocketInputStream behavior
+                    return -1; // return -1, even if socket is closed
                 } else if (len == 0) {
-                    return 0;
+                    return 0;  // return 0, even if socket is closed
                 } else {
                     try {
                         // read up to MAX_BUFFER_SIZE bytes
                         int size = Math.min(len, MAX_BUFFER_SIZE);
-                        ByteBuffer dst = ByteBuffer.wrap(b, off, size);
-                        int n = NioSocketImpl.this.read(dst);
+                        int n = NioSocketImpl.this.read(b, off, size);
                         if (n == -1)
                             eof = true;
                         return n;
@@ -782,20 +812,18 @@
                 write(a, 0, 1);
             }
             @Override
-            public void write(byte b[], int off, int len) throws IOException {
+            public void write(byte[] b, int off, int len) throws IOException {
                 Objects.checkFromIndexSize(off, len, b.length);
                 if (len > 0) {
                     try {
-                        ByteBuffer src = ByteBuffer.wrap(b, off, len);
-                        int end = src.limit();
-                        int pos;
-                        // write up to MAX_BUFFER_SIZE bytes at a time
-                        while ((pos = src.position()) < end) {
+                        int pos = off;
+                        int end = off + len;
+                        while (pos < end) {
+                            // write up to MAX_BUFFER_SIZE bytes
                             int size = Math.min((end - pos), MAX_BUFFER_SIZE);
-                            src.limit(pos + size);
-                            NioSocketImpl.this.write(src);
+                            int n = NioSocketImpl.this.write(b, pos, size);
+                            pos += n;
                         }
-                    assert src.limit() == end && src.remaining() == 0;
                     } catch (IOException ioe) {
                         throw new SocketException(ioe.getMessage());
                     }
@@ -1147,7 +1175,7 @@
             int n = 0;
             FileDescriptor fd = beginWrite();
             try {
-                maybeConfigureNonBlocking(fd, 0);
+                configureNonBlockingIfNeeded(fd, 0);
                 do {
                     n = Net.sendOOB(fd, (byte) data);
                 } while (n == IOStatus.INTERRUPTED && isOpen());
@@ -1179,7 +1207,7 @@
                 throw new InternalError(e);
             }
         }
-        
+
         private final FileDescriptor fd;
         private final boolean stream;
         private volatile boolean closed;
@@ -1195,7 +1223,7 @@
             CleanerFactory.cleaner().register(impl, closer);
             return closer;
         }
-        
+
         @Override
         public void run() {
             if (CLOSED.compareAndSet(this, false, true)) {
@@ -1205,7 +1233,7 @@
                     throw new RuntimeException(ioe);
                 } finally {
                     if (!stream) {
-                        // decrement 
+                        // decrement
                         ResourceManager.afterUdpClose();
                     }
                 }