changeset 17575:46d6f1587c45

8177935: java/net/httpclient/http2/FixedThreadPoolTest.java fails frequently Summary: fixes a race condition in AsyncWriteQueue Reviewed-by: chegar
author dfuchs
date Thu, 17 Aug 2017 16:48:45 +0100
parents 6256e94781f5
children 51f1ea2a3d3f 402802492f6a
files src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/AsyncSSLDelegate.java src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainHttpConnection.java src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/AsyncWriteQueue.java test/java/net/httpclient/http2/FixedThreadPoolTest.java
diffstat 4 files changed, 28 insertions(+), 12 deletions(-) [+]
line wrap: on
line diff
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/AsyncSSLDelegate.java	Wed Aug 16 16:46:51 2017 -0400
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/AsyncSSLDelegate.java	Thu Aug 17 16:48:45 2017 +0100
@@ -196,7 +196,7 @@
      * This same method is called to try and resume output after a blocking
      * handshaking operation has completed.
      */
-    private void upperWrite(ByteBufferReference[] refs, AsyncWriteQueue delayCallback) {
+    private boolean upperWrite(ByteBufferReference[] refs, AsyncWriteQueue delayCallback) {
         // currently delayCallback is not used. Use it when it's needed to execute handshake in another thread.
         try {
             ByteBuffer[] buffers = ByteBufferReference.toBuffers(refs);
@@ -230,6 +230,9 @@
             closeExceptionally(t);
             errorHandler.accept(t);
         }
+        // We always return true: either all the data was sent, or
+        // an exception happened and we have closed the queue.
+        return true;
     }
 
     // Connecting at this level means the initial handshake has completed.
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainHttpConnection.java	Wed Aug 16 16:46:51 2017 -0400
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainHttpConnection.java	Thu Aug 17 16:48:45 2017 +0100
@@ -231,7 +231,7 @@
         assert false;
     }
 
-    void asyncOutput(ByteBufferReference[] refs, AsyncWriteQueue delayCallback) {
+    boolean asyncOutput(ByteBufferReference[] refs, AsyncWriteQueue delayCallback) {
         try {
             ByteBuffer[] bufs = ByteBufferReference.toBuffers(refs);
             while (Utils.remaining(bufs) > 0) {
@@ -239,13 +239,14 @@
                 if (n == 0) {
                     delayCallback.setDelayed(refs);
                     client.registerEvent(new WriteEvent());
-                    return;
+                    return false;
                 }
             }
             ByteBufferReference.clear(refs);
         } catch (IOException e) {
             shutdown();
         }
+        return true;
     }
 
     @Override
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/AsyncWriteQueue.java	Wed Aug 16 16:46:51 2017 -0400
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/AsyncWriteQueue.java	Thu Aug 17 16:48:45 2017 +0100
@@ -27,17 +27,31 @@
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Deque;
 import java.util.List;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.BiConsumer;
 
 public class AsyncWriteQueue implements Closeable {
 
+    @FunctionalInterface
+    public static interface AsyncConsumer {
+        /**
+         * Takes an array of buffer reference and attempt to send the data
+         * downstream. If not all the data can be sent, then push back
+         * to the source queue by calling {@code source.setDelayed(buffers)}
+         * and return false. If all the data was successfully sent downstream
+         * then returns true.
+         * @param buffers An array of ButeBufferReference containing data
+         *                to send downstream.
+         * @param source This AsyncWriteQueue.
+         * @return true if all the data could be sent downstream, false otherwise.
+         */
+        boolean trySend(ByteBufferReference[] buffers, AsyncWriteQueue source);
+    }
+
     private static final int IDLE    = 0;     // nobody is flushing from the queue
     private static final int FLUSHING = 1;    // there is the only thread flushing from the queue
     private static final int REFLUSHING = 2;  // while one thread was flushing from the queue
@@ -51,7 +65,7 @@
 
     private final AtomicInteger state = new AtomicInteger(IDLE);
     private final Deque<ByteBufferReference[]> queue = new ConcurrentLinkedDeque<>();
-    private final BiConsumer<ByteBufferReference[], AsyncWriteQueue> consumeAction;
+    private final AsyncConsumer consumeAction;
 
     // Queue may be processed in two modes:
     // 1. if(!doFullDrain) - invoke callback on each chunk
@@ -60,11 +74,11 @@
 
     private ByteBufferReference[] delayedElement = null;
 
-    public AsyncWriteQueue(BiConsumer<ByteBufferReference[], AsyncWriteQueue> consumeAction) {
+    public AsyncWriteQueue(AsyncConsumer consumeAction) {
         this(consumeAction, true);
     }
 
-    public AsyncWriteQueue(BiConsumer<ByteBufferReference[], AsyncWriteQueue> consumeAction, boolean doFullDrain) {
+    public AsyncWriteQueue(AsyncConsumer consumeAction, boolean doFullDrain) {
         this.consumeAction = consumeAction;
         this.doFullDrain = doFullDrain;
     }
@@ -156,8 +170,7 @@
         }
         while(true) {
             while (element != null) {
-                consumeAction.accept(element, this);
-                if (state.get() == DELAYED) {
+                if (!consumeAction.trySend(element, this)) {
                     return;
                 }
                 element = drain();
--- a/test/java/net/httpclient/http2/FixedThreadPoolTest.java	Wed Aug 16 16:46:51 2017 -0400
+++ b/test/java/net/httpclient/http2/FixedThreadPoolTest.java	Thu Aug 17 16:48:45 2017 +0100
@@ -23,8 +23,7 @@
 
 /*
  * @test
- * @bug 8087112
- * @key intermittent
+ * @bug 8087112 8177935
  * @library /lib/testlibrary server
  * @build jdk.testlibrary.SimpleSSLContext
  * @modules jdk.incubator.httpclient/jdk.incubator.http.internal.common