changeset 53176:77f31b03cc0e

8215326: Test java/util/concurrent/ConcurrentHashMap/ToArray.java hangs after j.u.c updates Reviewed-by: martin, dholmes
author dl
date Wed, 12 Dec 2018 20:13:39 -0800
parents 85ade44f351a
children 8b585e1b2805
files src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
diffstat 1 files changed, 53 insertions(+), 62 deletions(-) [+]
line wrap: on
line diff
--- a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java	Thu Dec 13 12:03:03 2018 +0800
+++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java	Wed Dec 12 20:13:39 2018 -0800
@@ -445,7 +445,8 @@
      * if to its current value).  This would be extremely costly. So
      * we relax it in several ways: (1) Producers only signal when
      * their queue is possibly empty at some point during a push
-     * operation. (2) Other workers propagate this signal
+     * operation (which requires conservatively checking size zero or
+     * one to cover races). (2) Other workers propagate this signal
      * when they find tasks in a queue with size greater than one. (3)
      * Workers only enqueue after scanning (see below) and not finding
      * any tasks.  (4) Rather than CASing ctl to its current value in
@@ -761,8 +762,10 @@
 
     /**
      * The maximum number of top-level polls per worker before
-     * checking other queues, expressed as a bit shift.  See above for
-     * rationale.
+     * checking other queues, expressed as a bit shift to, in effect,
+     * multiply by pool size, and then use as random value mask, so
+     * average bound is about poolSize*(1<<TOP_BOUND_SHIFT).  See
+     * above for rationale.
      */
     static final int TOP_BOUND_SHIFT = 10;
 
@@ -838,17 +841,18 @@
          */
         final void push(ForkJoinTask<?> task) {
             ForkJoinTask<?>[] a;
-            int s = top, d = s - base, cap, m;
+            int s = top, d, cap, m;
             ForkJoinPool p = pool;
             if ((a = array) != null && (cap = a.length) > 0) {
                 QA.setRelease(a, (m = cap - 1) & s, task);
                 top = s + 1;
-                if (d == m)
+                if (((d = s - (int)BASE.getAcquire(this)) & ~1) == 0 &&
+                    p != null) {                 // size 0 or 1
+                    VarHandle.fullFence();
+                    p.signalWork();
+                }
+                else if (d == m)
                     growArray(false);
-                else if (QA.getAcquire(a, m & (s - 1)) == null && p != null) {
-                    VarHandle.fullFence();  // was empty
-                    p.signalWork(null);
-                }
             }
         }
 
@@ -859,16 +863,16 @@
         final boolean lockedPush(ForkJoinTask<?> task) {
             ForkJoinTask<?>[] a;
             boolean signal = false;
-            int s = top, d = s - base, cap, m;
+            int s = top, b = base, cap, d;
             if ((a = array) != null && (cap = a.length) > 0) {
-                a[(m = (cap - 1)) & s] = task;
+                a[(cap - 1) & s] = task;
                 top = s + 1;
-                if (d == m)
+                if (b - s + cap - 1 == 0)
                     growArray(true);
                 else {
                     phase = 0; // full volatile unlock
-                    if (a[m & (s - 1)] == null)
-                        signal = true;   // was empty
+                    if (((s - base) & ~1) == 0) // size 0 or 1
+                        signal = true;
                 }
             }
             return signal;
@@ -1010,30 +1014,25 @@
          * queue, up to bound n (to avoid infinite unfairness).
          */
         final void topLevelExec(ForkJoinTask<?> t, WorkQueue q, int n) {
-            int nstolen = 1;
-            for (int j = 0;;) {
-                if (t != null)
+            if (t != null && q != null) { // hoist checks
+                int nstolen = 1;
+                for (;;) {
                     t.doExec();
-                if (j++ <= n)
-                    t = nextLocalTask();
-                else {
-                    j = 0;
-                    t = null;
+                    if (n-- < 0)
+                        break;
+                    else if ((t = nextLocalTask()) == null) {
+                        if ((t = q.poll()) == null)
+                            break;
+                        else
+                            ++nstolen;
+                    }
                 }
-                if (t == null) {
-                    if (q != null && (t = q.poll()) != null) {
-                        ++nstolen;
-                        j = 0;
-                    }
-                    else if (j != 0)
-                        break;
-                }
+                ForkJoinWorkerThread thread = owner;
+                nsteals += nstolen;
+                source = 0;
+                if (thread != null)
+                    thread.afterTopLevelExec();
             }
-            ForkJoinWorkerThread thread = owner;
-            nsteals += nstolen;
-            source = 0;
-            if (thread != null)
-                thread.afterTopLevelExec();
         }
 
         /**
@@ -1456,7 +1455,7 @@
 
         if (!tryTerminate(false, false) &&            // possibly replace worker
             w != null && w.array != null)             // avoid repeated failures
-            signalWork(null);
+            signalWork();
 
         if (ex == null)                               // help clean on way out
             ForkJoinTask.helpExpungeStaleExceptions();
@@ -1466,9 +1465,8 @@
 
     /**
      * Tries to create or release a worker if too few are running.
-     * @param q if non-null recheck if empty on CAS failure
      */
-    final void signalWork(WorkQueue q) {
+    final void signalWork() {
         for (;;) {
             long c; int sp; WorkQueue[] ws; int i; WorkQueue v;
             if ((c = ctl) >= 0L)                      // enough workers
@@ -1495,8 +1493,6 @@
                         LockSupport.unpark(vt);
                     break;
                 }
-                else if (q != null && q.isEmpty())     // no need to retry
-                    break;
             }
         }
     }
@@ -1617,24 +1613,19 @@
                 else if (rc <= 0 && (md & SHUTDOWN) != 0 &&
                          tryTerminate(false, false))
                     break;                        // quiescent shutdown
-                else if (w.phase < 0) {
-                    if (rc <= 0 && pred != 0 && phase == (int)c) {
-                        long nc = (UC_MASK & (c - TC_UNIT)) | (SP_MASK & pred);
-                        long d = keepAlive + System.currentTimeMillis();
-                        LockSupport.parkUntil(this, d);
-                        if (ctl == c &&           // drop on timeout if all idle
-                            d - System.currentTimeMillis() <= TIMEOUT_SLOP &&
-                            CTL.compareAndSet(this, c, nc)) {
-                            w.phase = QUIET;
-                            break;
-                        }
-                    }
-                    else {
-                        LockSupport.park(this);
-                        if (w.phase < 0)          // one spurious wakeup check
-                            LockSupport.park(this);
+                else if (rc <= 0 && pred != 0 && phase == (int)c) {
+                    long nc = (UC_MASK & (c - TC_UNIT)) | (SP_MASK & pred);
+                    long d = keepAlive + System.currentTimeMillis();
+                    LockSupport.parkUntil(this, d);
+                    if (ctl == c &&               // drop on timeout if all idle
+                        d - System.currentTimeMillis() <= TIMEOUT_SLOP &&
+                        CTL.compareAndSet(this, c, nc)) {
+                        w.phase = QUIET;
+                        break;
                     }
                 }
+                else if (w.phase < 0)
+                    LockSupport.park(this);       // OK if spuriously woken
                 w.source = 0;                     // disable signal
             }
         }
@@ -1650,8 +1641,8 @@
         WorkQueue[] ws; int n;
         if ((ws = workQueues) != null && (n = ws.length) > 0 && w != null) {
             for (int m = n - 1, j = r & m;;) {
-                WorkQueue q; int b, s;
-                if ((q = ws[j]) != null && (s = q.top) != (b = q.base)) {
+                WorkQueue q; int b;
+                if ((q = ws[j]) != null && q.top != (b = q.base)) {
                     int qid = q.id;
                     ForkJoinTask<?>[] a; int cap, k; ForkJoinTask<?> t;
                     if ((a = q.array) != null && (cap = a.length) > 0) {
@@ -1660,10 +1651,10 @@
                             QA.compareAndSet(a, k, t, null)) {
                             q.base = b;
                             w.source = qid;
-                            if (s != b && a[(cap - 1) & b] != null)
-                                signalWork(q);    // help signal if more tasks
+                            if (q.top - b > 0)
+                                signalWork();
                             w.topLevelExec(t, q,  // random fairness bound
-                                           (r | (1 << TOP_BOUND_SHIFT)) & SMASK);
+                                           r & ((n << TOP_BOUND_SHIFT) - 1));
                         }
                     }
                     return true;
@@ -1909,7 +1900,7 @@
                 r = ThreadLocalRandom.advanceProbe(r);
             else {
                 if (q.lockedPush(task))
-                    signalWork(null);
+                    signalWork();
                 return;
             }
         }