changeset 53277:3219c64a719a fibers

Allow Socket.connect to be park fiber
author alanb
date Mon, 07 Jan 2019 17:46:35 +0000
parents b2a78e16910f
children a5f73fb9f0c9
files src/java.base/share/classes/java/net/AbstractPlainSocketImpl.java src/java.base/share/classes/sun/nio/ch/SocketChannelImpl.java src/java.base/share/classes/sun/nio/ch/SocketStreams.java test/jdk/java/lang/Fiber/NetSockets.java
diffstat 4 files changed, 163 insertions(+), 38 deletions(-) [+]
line wrap: on
line diff
--- a/src/java.base/share/classes/java/net/AbstractPlainSocketImpl.java	Fri Jan 04 19:52:34 2019 +0000
+++ b/src/java.base/share/classes/java/net/AbstractPlainSocketImpl.java	Mon Jan 07 17:46:35 2019 +0000
@@ -393,32 +393,31 @@
      */
 
     synchronized void doConnect(InetAddress address, int port, int timeout) throws IOException {
+        SocketStreams socketStreams;
         synchronized (fdLock) {
             if (!closePending && (socket == null || !socket.isBound())) {
                 NetHooks.beforeTcpConnect(fd, address, port);
             }
+
+            socketStreams = this.socketStreams;
+            if (socketStreams == null) {
+                this.socketStreams = new SocketStreams(socket, fd, false).readTimeout(timeout);
+                socketStreams = this.socketStreams;
+            }
         }
+
         try {
-            acquireFD();
-            try {
-                socketConnect(address, port, timeout);
-                /* socket may have been closed during poll/select */
-                synchronized (fdLock) {
-                    if (closePending) {
-                        throw new SocketException ("Socket closed");
-                    }
-                }
-                // If we have a ref. to the Socket, then sets the flags
-                // created, bound & connected to true.
-                // This is normally done in Socket.connect() but some
-                // subclasses of Socket may call impl.connect() directly!
-                if (socket != null) {
-                    socket.setBound();
-                    socket.setConnected();
-                }
-            } finally {
-                releaseFD();
+            socketStreams.connect(address, port, timeout);
+
+            // If we have a ref. to the Socket, then sets the flags
+            // created, bound & connected to true.
+            // This is normally done in Socket.connect() but some
+            // subclasses of Socket may call impl.connect() directly!
+            if (socket != null) {
+                socket.setBound();
+                socket.setConnected();
             }
+
         } catch (IOException e) {
             close();
             throw SocketExceptions.of(e, new InetSocketAddress(address, port));
@@ -476,7 +475,7 @@
             if (shut_rd)
                 throw new IOException("Socket input is shutdown");
             if (socketStreams == null)
-                socketStreams = new SocketStreams(socket, fd).readTimeout(timeout);
+                socketStreams = new SocketStreams(socket, fd, true).readTimeout(timeout);
             return socketStreams.inputStream();
         }
     }
@@ -495,7 +494,7 @@
             if (shut_wr)
                 throw new IOException("Socket output is shutdown");
             if (socketStreams == null)
-                socketStreams = new SocketStreams(socket, fd).readTimeout(timeout);
+                socketStreams = new SocketStreams(socket, fd, true).readTimeout(timeout);
             return socketStreams.outputStream();
         }
     }
--- a/src/java.base/share/classes/sun/nio/ch/SocketChannelImpl.java	Fri Jan 04 19:52:34 2019 +0000
+++ b/src/java.base/share/classes/sun/nio/ch/SocketChannelImpl.java	Mon Jan 07 17:46:35 2019 +0000
@@ -1177,7 +1177,7 @@
 
     // -- Native methods --
 
-    private static native int checkConnect(FileDescriptor fd, boolean block)
+    static native int checkConnect(FileDescriptor fd, boolean block)
         throws IOException;
 
     private static native int sendOutOfBandData(FileDescriptor fd, byte data)
--- a/src/java.base/share/classes/sun/nio/ch/SocketStreams.java	Fri Jan 04 19:52:34 2019 +0000
+++ b/src/java.base/share/classes/sun/nio/ch/SocketStreams.java	Mon Jan 07 17:46:35 2019 +0000
@@ -30,6 +30,7 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.net.InetAddress;
 import java.net.SocketException;
 import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
@@ -59,9 +60,11 @@
 
     // The stateLock is needed when changing state
     private final Object stateLock = new Object();
-    private static final int ST_CONNECTED = 0;
-    private static final int ST_CLOSING = 1;
-    private static final int ST_CLOSED = 2;
+    private static final int ST_UNCONNECTED = 0;
+    private static final int ST_CONNECTING = 1;
+    private static final int ST_CONNECTED = 2;
+    private static final int ST_CLOSING = 3;
+    private static final int ST_CLOSED = 4;
     private volatile int state;  // need stateLock to change
 
     // protected by stateLock
@@ -78,10 +81,17 @@
     /**
      * Creates a SocketStreams to wrap the given file description.
      */
-    public SocketStreams(Closeable parent, FileDescriptor fd) throws IOException {
+    public SocketStreams(Closeable parent,
+                         FileDescriptor fd,
+                         boolean connected) throws IOException {
         this.parent = Objects.requireNonNull(parent);
         this.fd = fd;
         this.fdVal = IOUtil.fdVal(fd);
+        if (connected) {
+            synchronized (stateLock) {
+                this.state = ST_CONNECTED;
+            }
+        }
         IOUtil.configureBlocking(fd, false);
     }
 
@@ -89,18 +99,30 @@
      * Returns true if the socket is open.
      */
     private boolean isOpen() {
-        return state == ST_CONNECTED;
+        return state < ST_CLOSING;
     }
 
     /**
      * Closes a SocketException if the socket is not open.
      */
     private void ensureOpen() throws SocketException {
-        if (state > ST_CONNECTED)
+        if (state >= ST_CLOSED)
             throw new SocketException("socket closed");
     }
 
     /**
+     * Closes a SocketException if the socket is not open and connected.
+     */
+    private void ensureOpenAndConnected() throws SocketException {
+        int state = this.state;
+        if (state < ST_CONNECTED) {
+            throw new SocketException("not connected");
+        } else if (state > ST_CONNECTED) {
+            throw new SocketException("socket closed");
+        }
+    }
+
+    /**
      * Disables the current thread or fiber for scheduling purposes until this
      * socket is ready for I/O, or asynchronously closed, for up to the
      * specified waiting time, unless the permit is available.
@@ -140,13 +162,50 @@
     }
 
     /**
+     * Marks the beginning of a connect operation that might block.
+     *
+     * @throws SocketException if the socket is closed or already conneced
+     */
+    private void beginConnect() throws IOException {
+        synchronized (stateLock) {
+            int state = this.state;
+            if (state > ST_CONNECTED) {
+                throw new SocketException("socket closed");
+            } else if (state != ST_UNCONNECTED) {
+                throw new SocketException("already connected");
+            }
+            this.state = ST_CONNECTING;
+            readerThread = NativeThread.current();
+        }
+    }
+
+    /**
+     * Marks the end of a connect operation that may have blocked.
+     *
+     * @throws SocketException is the socket is closed
+     */
+    private void endConnect(boolean completed) throws IOException {
+        synchronized (stateLock) {
+            readerThread = 0;
+            int state = this.state;
+            if (state == ST_CLOSING)
+                stateLock.notifyAll();
+            if (completed && state == ST_CONNECTING) {
+                this.state = ST_CONNECTED;
+            } else if (!completed && state >= ST_CLOSING) {
+                throw new SocketException("socket closed");
+            }
+        }
+    }
+
+    /**
      * Marks the beginning of a read operation that might block.
      *
-     * @throws SocketException if the socket is closed
+     * @throws SocketException if the socket is closed or not connected
      */
     private void beginRead() throws SocketException {
         synchronized (stateLock) {
-            ensureOpen();
+            ensureOpenAndConnected();
             readerThread = NativeThread.current();
         }
     }
@@ -170,11 +229,11 @@
     /**
      * Marks the beginning of a write operation that might block.
      *
-     * @throws SocketException if the socket is closed
+     * @throws SocketException if the socket is closed or not connected
      */
     private void beginWrite() throws SocketException {
         synchronized (stateLock) {
-            ensureOpen();
+            ensureOpenAndConnected();
             writerThread = NativeThread.current();
         }
     }
@@ -196,6 +255,55 @@
     }
 
     /**
+     * Connect the socket to the given address/port.
+     *
+     * @throws IOException if the socket is closed or an I/O occurs
+     * @throws SocketTimeoutException if the connect timeout elapses
+     */
+    public void connect(InetAddress address, int port, long millis) throws IOException {
+        readLock.lock();
+        try {
+            writeLock.lock();
+            try {
+                boolean connected = false;
+                try {
+                    beginConnect();
+                    int n = Net.connect(fd, address, port);
+                    if (n == IOStatus.UNAVAILABLE && isOpen()) {
+                        long nanos = NANOSECONDS.convert(millis, MILLISECONDS);
+                        if (nanos > 0) {
+                            // connect with timeout
+                            do {
+                                long startTime = System.nanoTime();
+                                park(Net.POLLOUT, nanos);
+                                n = SocketChannelImpl.checkConnect(fd, false);
+                                if (n == IOStatus.UNAVAILABLE) {
+                                    nanos -= System.nanoTime() - startTime;
+                                    if (nanos <= 0)
+                                        throw new SocketTimeoutException();
+                                }
+                            } while (n == IOStatus.UNAVAILABLE && isOpen());
+                        } else {
+                            // connect, no timeout
+                            do {
+                                park(Net.POLLOUT, 0);
+                                n = SocketChannelImpl.checkConnect(fd, false);
+                            } while (n == IOStatus.UNAVAILABLE && isOpen());
+                        }
+                    }
+                    connected = (n > 0) && isOpen();
+                } finally {
+                    endConnect(connected);
+                }
+            } finally {
+                writeLock.unlock();
+            }
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    /**
      * Reads bytes from the socket into the given buffer.
      *
      * @throws IOException if the socket is closed or an I/O occurs
@@ -253,11 +361,9 @@
             beginWrite();
             try {
                 n = IOUtil.write(fd, dst, -1, nd);
-                if (n == IOStatus.UNAVAILABLE && isOpen()) {
-                    do {
-                        park(Net.POLLOUT, 0);
-                        n = IOUtil.write(fd, dst, -1, nd);
-                    } while (n == IOStatus.UNAVAILABLE && isOpen());
+                while (n == IOStatus.UNAVAILABLE && isOpen()) {
+                    park(Net.POLLOUT, 0);
+                    n = IOUtil.write(fd, dst, -1, nd);
                 }
                 return n;
             } finally {
@@ -274,7 +380,7 @@
     private int available() throws IOException {
         readLock.lock();
         try {
-            ensureOpen();
+            ensureOpenAndConnected();
             if (isInputClosed) {
                 return 0;
             } else {
--- a/test/jdk/java/lang/Fiber/NetSockets.java	Fri Jan 04 19:52:34 2019 +0000
+++ b/test/jdk/java/lang/Fiber/NetSockets.java	Mon Jan 07 17:46:35 2019 +0000
@@ -48,6 +48,26 @@
     }
 
     /**
+     * Cancel a fiber in connect
+     */
+    public void testSocketConnectCancel1() {
+        test(() -> {
+            try (var listener = new ServerSocket()) {
+                listener.bind(new InetSocketAddress( InetAddress.getLocalHost(), 0));
+                Fiber.current().map(Fiber::cancel);
+                Socket s = new Socket();
+                try {
+                    s.connect(listener.getLocalSocketAddress());
+                    assertTrue(false);
+                } catch (IOException expected) {
+                } finally {
+                    s.close();
+                }
+            }
+        });
+    }
+
+    /**
      * Socket read/write, no blocking
      */
     public void testSocketReadWrite1() {