changeset 1074:5303aece2068

6801020: Concurrent Semaphore release may cause some require thread not signaled Summary: Introduce PROPAGATE waitStatus Reviewed-by: martin
author dl
date Thu, 26 Mar 2009 11:59:07 -0700
parents 2dae30c4d687
children 4a685f3f3ba8
files src/share/classes/java/util/concurrent/locks/AbstractQueuedLongSynchronizer.java src/share/classes/java/util/concurrent/locks/AbstractQueuedSynchronizer.java test/java/util/concurrent/Semaphore/RacingReleases.java
diffstat 3 files changed, 322 insertions(+), 80 deletions(-) [+]
line wrap: on
line diff
--- a/src/share/classes/java/util/concurrent/locks/AbstractQueuedLongSynchronizer.java	Wed Mar 25 12:24:30 2009 -0700
+++ b/src/share/classes/java/util/concurrent/locks/AbstractQueuedLongSynchronizer.java	Thu Mar 26 11:59:07 2009 -0700
@@ -166,6 +166,11 @@
         static final int SIGNAL    = -1;
         /** waitStatus value to indicate thread is waiting on condition */
         static final int CONDITION = -2;
+        /**
+         * waitStatus value to indicate the next acquireShared should
+         * unconditionally propagate
+         */
+        static final int PROPAGATE = -3;
 
         /**
          * Status field, taking on only the values:
@@ -180,10 +185,16 @@
          *               Nodes never leave this state. In particular,
          *               a thread with cancelled node never again blocks.
          *   CONDITION:  This node is currently on a condition queue.
-         *               It will not be used as a sync queue node until
-         *               transferred. (Use of this value here
-         *               has nothing to do with the other uses
-         *               of the field, but simplifies mechanics.)
+         *               It will not be used as a sync queue node
+         *               until transferred, at which time the status
+         *               will be set to 0. (Use of this value here has
+         *               nothing to do with the other uses of the
+         *               field, but simplifies mechanics.)
+         *   PROPAGATE:  A releaseShared should be propagated to other
+         *               nodes. This is set (for head node only) in
+         *               doReleaseShared to ensure propagation
+         *               continues, even if other operations have
+         *               since intervened.
          *   0:          None of the above
          *
          * The values are arranged numerically to simplify use.
@@ -403,10 +414,13 @@
      */
     private void unparkSuccessor(Node node) {
         /*
-         * Try to clear status in anticipation of signalling.  It is
-         * OK if this fails or if status is changed by waiting thread.
+         * If status is negative (i.e., possibly needing signal) try
+         * to clear in anticipation of signalling.  It is OK if this
+         * fails or if status is changed by waiting thread.
          */
-        compareAndSetWaitStatus(node, Node.SIGNAL, 0);
+        int ws = node.waitStatus;
+        if (ws < 0)
+            compareAndSetWaitStatus(node, ws, 0);
 
         /*
          * Thread to unpark is held in successor, which is normally
@@ -426,23 +440,70 @@
     }
 
     /**
+     * Release action for shared mode -- signal successor and ensure
+     * propagation. (Note: For exclusive mode, release just amounts
+     * to calling unparkSuccessor of head if it needs signal.)
+     */
+    private void doReleaseShared() {
+        /*
+         * Ensure that a release propagates, even if there are other
+         * in-progress acquires/releases.  This proceeds in the usual
+         * way of trying to unparkSuccessor of head if it needs
+         * signal. But if it does not, status is set to PROPAGATE to
+         * ensure that upon release, propagation continues.
+         * Additionally, we must loop in case a new node is added
+         * while we are doing this. Also, unlike other uses of
+         * unparkSuccessor, we need to know if CAS to reset status
+         * fails, if so rechecking.
+         */
+        for (;;) {
+            Node h = head;
+            if (h != null && h != tail) {
+                int ws = h.waitStatus;
+                if (ws == Node.SIGNAL) {
+                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
+                        continue;            // loop to recheck cases
+                    unparkSuccessor(h);
+                }
+                else if (ws == 0 &&
+                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
+                    continue;                // loop on failed CAS
+            }
+            if (h == head)                   // loop if head changed
+                break;
+        }
+    }
+
+    /**
      * Sets head of queue, and checks if successor may be waiting
-     * in shared mode, if so propagating if propagate > 0.
+     * in shared mode, if so propagating if either propagate > 0 or
+     * PROPAGATE status was set.
      *
-     * @param pred the node holding waitStatus for node
      * @param node the node
      * @param propagate the return value from a tryAcquireShared
      */
     private void setHeadAndPropagate(Node node, long propagate) {
+        Node h = head; // Record old head for check below
         setHead(node);
-        if (propagate > 0 && node.waitStatus != 0) {
-            /*
-             * Don't bother fully figuring out successor.  If it
-             * looks null, call unparkSuccessor anyway to be safe.
-             */
+        /*
+         * Try to signal next queued node if:
+         *   Propagation was indicated by caller,
+         *     or was recorded (as h.waitStatus) by a previous operation
+         *     (note: this uses sign-check of waitStatus because
+         *      PROPAGATE status may transition to SIGNAL.)
+         * and
+         *   The next node is waiting in shared mode,
+         *     or we don't know, because it appears null
+         *
+         * The conservatism in both of these checks may cause
+         * unnecessary wake-ups, but only when there are multiple
+         * racing acquires/releases, so most need signals now or soon
+         * anyway.
+         */
+        if (propagate > 0 || h == null || h.waitStatus < 0) {
             Node s = node.next;
             if (s == null || s.isShared())
-                unparkSuccessor(node);
+                doReleaseShared();
         }
     }
 
@@ -465,23 +526,27 @@
         while (pred.waitStatus > 0)
             node.prev = pred = pred.prev;
 
-        // Getting this before setting waitStatus ensures staleness
+        // predNext is the apparent node to unsplice. CASes below will
+        // fail if not, in which case, we lost race vs another cancel
+        // or signal, so no further action is necessary.
         Node predNext = pred.next;
 
-        // Can use unconditional write instead of CAS here
+        // Can use unconditional write instead of CAS here.
+        // After this atomic step, other Nodes can skip past us.
+        // Before, we are free of interference from other threads.
         node.waitStatus = Node.CANCELLED;
 
-        // If we are the tail, remove ourselves
+        // If we are the tail, remove ourselves.
         if (node == tail && compareAndSetTail(node, pred)) {
             compareAndSetNext(pred, predNext, null);
         } else {
-            // If "active" predecessor found...
-            if (pred != head
-                && (pred.waitStatus == Node.SIGNAL
-                    || compareAndSetWaitStatus(pred, 0, Node.SIGNAL))
-                && pred.thread != null) {
-
-                // If successor is active, set predecessor's next link
+            // If successor needs signal, try to set pred's next-link
+            // so it will get one. Otherwise wake it up to propagate.
+            int ws;
+            if (pred != head &&
+                ((ws = pred.waitStatus) == Node.SIGNAL ||
+                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
+                pred.thread != null) {
                 Node next = node.next;
                 if (next != null && next.waitStatus <= 0)
                     compareAndSetNext(pred, predNext, next);
@@ -503,14 +568,14 @@
      * @return {@code true} if thread should block
      */
     private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
-        int s = pred.waitStatus;
-        if (s < 0)
+        int ws = pred.waitStatus;
+        if (ws == Node.SIGNAL)
             /*
              * This node has already set status asking a release
              * to signal it, so it can safely park.
              */
             return true;
-        if (s > 0) {
+        if (ws > 0) {
             /*
              * Predecessor was cancelled. Skip over predecessors and
              * indicate retry.
@@ -519,14 +584,14 @@
                 node.prev = pred = pred.prev;
             } while (pred.waitStatus > 0);
             pred.next = node;
+        } else {
+            /*
+             * waitStatus must be 0 or PROPAGATE.  Indicate that we
+             * need a signal, but don't park yet.  Caller will need to
+             * retry to make sure it cannot acquire before parking.
+             */
+            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
         }
-        else
-            /*
-             * Indicate that we need a signal, but don't park yet. Caller
-             * will need to retry to make sure it cannot acquire before
-             * parking.
-             */
-            compareAndSetWaitStatus(pred, 0, Node.SIGNAL);
         return false;
     }
 
@@ -1046,9 +1111,7 @@
      */
     public final boolean releaseShared(long arg) {
         if (tryReleaseShared(arg)) {
-            Node h = head;
-            if (h != null && h.waitStatus != 0)
-                unparkSuccessor(h);
+            doReleaseShared();
             return true;
         }
         return false;
@@ -1390,8 +1453,8 @@
          * case the waitStatus can be transiently and harmlessly wrong).
          */
         Node p = enq(node);
-        int c = p.waitStatus;
-        if (c > 0 || !compareAndSetWaitStatus(p, c, Node.SIGNAL))
+        int ws = p.waitStatus;
+        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
             LockSupport.unpark(node.thread);
         return true;
     }
--- a/src/share/classes/java/util/concurrent/locks/AbstractQueuedSynchronizer.java	Wed Mar 25 12:24:30 2009 -0700
+++ b/src/share/classes/java/util/concurrent/locks/AbstractQueuedSynchronizer.java	Thu Mar 26 11:59:07 2009 -0700
@@ -389,6 +389,11 @@
         static final int SIGNAL    = -1;
         /** waitStatus value to indicate thread is waiting on condition */
         static final int CONDITION = -2;
+        /**
+         * waitStatus value to indicate the next acquireShared should
+         * unconditionally propagate
+         */
+        static final int PROPAGATE = -3;
 
         /**
          * Status field, taking on only the values:
@@ -403,10 +408,16 @@
          *               Nodes never leave this state. In particular,
          *               a thread with cancelled node never again blocks.
          *   CONDITION:  This node is currently on a condition queue.
-         *               It will not be used as a sync queue node until
-         *               transferred. (Use of this value here
-         *               has nothing to do with the other uses
-         *               of the field, but simplifies mechanics.)
+         *               It will not be used as a sync queue node
+         *               until transferred, at which time the status
+         *               will be set to 0. (Use of this value here has
+         *               nothing to do with the other uses of the
+         *               field, but simplifies mechanics.)
+         *   PROPAGATE:  A releaseShared should be propagated to other
+         *               nodes. This is set (for head node only) in
+         *               doReleaseShared to ensure propagation
+         *               continues, even if other operations have
+         *               since intervened.
          *   0:          None of the above
          *
          * The values are arranged numerically to simplify use.
@@ -626,10 +637,13 @@
      */
     private void unparkSuccessor(Node node) {
         /*
-         * Try to clear status in anticipation of signalling.  It is
-         * OK if this fails or if status is changed by waiting thread.
+         * If status is negative (i.e., possibly needing signal) try
+         * to clear in anticipation of signalling.  It is OK if this
+         * fails or if status is changed by waiting thread.
          */
-        compareAndSetWaitStatus(node, Node.SIGNAL, 0);
+        int ws = node.waitStatus;
+        if (ws < 0)
+            compareAndSetWaitStatus(node, ws, 0);
 
         /*
          * Thread to unpark is held in successor, which is normally
@@ -649,23 +663,70 @@
     }
 
     /**
+     * Release action for shared mode -- signal successor and ensure
+     * propagation. (Note: For exclusive mode, release just amounts
+     * to calling unparkSuccessor of head if it needs signal.)
+     */
+    private void doReleaseShared() {
+        /*
+         * Ensure that a release propagates, even if there are other
+         * in-progress acquires/releases.  This proceeds in the usual
+         * way of trying to unparkSuccessor of head if it needs
+         * signal. But if it does not, status is set to PROPAGATE to
+         * ensure that upon release, propagation continues.
+         * Additionally, we must loop in case a new node is added
+         * while we are doing this. Also, unlike other uses of
+         * unparkSuccessor, we need to know if CAS to reset status
+         * fails, if so rechecking.
+         */
+        for (;;) {
+            Node h = head;
+            if (h != null && h != tail) {
+                int ws = h.waitStatus;
+                if (ws == Node.SIGNAL) {
+                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
+                        continue;            // loop to recheck cases
+                    unparkSuccessor(h);
+                }
+                else if (ws == 0 &&
+                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
+                    continue;                // loop on failed CAS
+            }
+            if (h == head)                   // loop if head changed
+                break;
+        }
+    }
+
+    /**
      * Sets head of queue, and checks if successor may be waiting
-     * in shared mode, if so propagating if propagate > 0.
+     * in shared mode, if so propagating if either propagate > 0 or
+     * PROPAGATE status was set.
      *
-     * @param pred the node holding waitStatus for node
      * @param node the node
      * @param propagate the return value from a tryAcquireShared
      */
     private void setHeadAndPropagate(Node node, int propagate) {
+        Node h = head; // Record old head for check below
         setHead(node);
-        if (propagate > 0 && node.waitStatus != 0) {
-            /*
-             * Don't bother fully figuring out successor.  If it
-             * looks null, call unparkSuccessor anyway to be safe.
-             */
+        /*
+         * Try to signal next queued node if:
+         *   Propagation was indicated by caller,
+         *     or was recorded (as h.waitStatus) by a previous operation
+         *     (note: this uses sign-check of waitStatus because
+         *      PROPAGATE status may transition to SIGNAL.)
+         * and
+         *   The next node is waiting in shared mode,
+         *     or we don't know, because it appears null
+         *
+         * The conservatism in both of these checks may cause
+         * unnecessary wake-ups, but only when there are multiple
+         * racing acquires/releases, so most need signals now or soon
+         * anyway.
+         */
+        if (propagate > 0 || h == null || h.waitStatus < 0) {
             Node s = node.next;
             if (s == null || s.isShared())
-                unparkSuccessor(node);
+                doReleaseShared();
         }
     }
 
@@ -688,23 +749,27 @@
         while (pred.waitStatus > 0)
             node.prev = pred = pred.prev;
 
-        // Getting this before setting waitStatus ensures staleness
+        // predNext is the apparent node to unsplice. CASes below will
+        // fail if not, in which case, we lost race vs another cancel
+        // or signal, so no further action is necessary.
         Node predNext = pred.next;
 
-        // Can use unconditional write instead of CAS here
+        // Can use unconditional write instead of CAS here.
+        // After this atomic step, other Nodes can skip past us.
+        // Before, we are free of interference from other threads.
         node.waitStatus = Node.CANCELLED;
 
-        // If we are the tail, remove ourselves
+        // If we are the tail, remove ourselves.
         if (node == tail && compareAndSetTail(node, pred)) {
             compareAndSetNext(pred, predNext, null);
         } else {
-            // If "active" predecessor found...
-            if (pred != head
-                && (pred.waitStatus == Node.SIGNAL
-                    || compareAndSetWaitStatus(pred, 0, Node.SIGNAL))
-                && pred.thread != null) {
-
-                // If successor is active, set predecessor's next link
+            // If successor needs signal, try to set pred's next-link
+            // so it will get one. Otherwise wake it up to propagate.
+            int ws;
+            if (pred != head &&
+                ((ws = pred.waitStatus) == Node.SIGNAL ||
+                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
+                pred.thread != null) {
                 Node next = node.next;
                 if (next != null && next.waitStatus <= 0)
                     compareAndSetNext(pred, predNext, next);
@@ -726,14 +791,14 @@
      * @return {@code true} if thread should block
      */
     private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
-        int s = pred.waitStatus;
-        if (s < 0)
+        int ws = pred.waitStatus;
+        if (ws == Node.SIGNAL)
             /*
              * This node has already set status asking a release
              * to signal it, so it can safely park.
              */
             return true;
-        if (s > 0) {
+        if (ws > 0) {
             /*
              * Predecessor was cancelled. Skip over predecessors and
              * indicate retry.
@@ -742,14 +807,14 @@
                 node.prev = pred = pred.prev;
             } while (pred.waitStatus > 0);
             pred.next = node;
+        } else {
+            /*
+             * waitStatus must be 0 or PROPAGATE.  Indicate that we
+             * need a signal, but don't park yet.  Caller will need to
+             * retry to make sure it cannot acquire before parking.
+             */
+            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
         }
-        else
-            /*
-             * Indicate that we need a signal, but don't park yet. Caller
-             * will need to retry to make sure it cannot acquire before
-             * parking.
-             */
-            compareAndSetWaitStatus(pred, 0, Node.SIGNAL);
         return false;
     }
 
@@ -1269,9 +1334,7 @@
      */
     public final boolean releaseShared(int arg) {
         if (tryReleaseShared(arg)) {
-            Node h = head;
-            if (h != null && h.waitStatus != 0)
-                unparkSuccessor(h);
+            doReleaseShared();
             return true;
         }
         return false;
@@ -1613,8 +1676,8 @@
          * case the waitStatus can be transiently and harmlessly wrong).
          */
         Node p = enq(node);
-        int c = p.waitStatus;
-        if (c > 0 || !compareAndSetWaitStatus(p, c, Node.SIGNAL))
+        int ws = p.waitStatus;
+        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
             LockSupport.unpark(node.thread);
         return true;
     }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test/java/util/concurrent/Semaphore/RacingReleases.java	Thu Mar 26 11:59:07 2009 -0700
@@ -0,0 +1,116 @@
+/*
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ */
+
+/*
+ * This file is available under and governed by the GNU General Public
+ * License version 2 only, as published by the Free Software Foundation.
+ * However, the following notice accompanied the original version of this
+ * file:
+ *
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+/*
+ * @test
+ * @bug 6801020 6803402
+ * @summary Try to tickle race conditions in
+ * AbstractQueuedSynchronizer "shared" code
+ */
+
+import java.util.concurrent.Semaphore;
+
+public class RacingReleases {
+
+    /** Increase this for better chance of tickling races */
+    static final int iterations = 1000;
+
+    public static void test(final boolean fair,
+                            final boolean interruptibly)
+        throws Throwable {
+        for (int i = 0; i < iterations; i++) {
+            final Semaphore sem = new Semaphore(0, fair);
+            final Throwable[] badness = new Throwable[1];
+            Runnable blocker = interruptibly ?
+                new Runnable() {
+                    public void run() {
+                        try {
+                            sem.acquire();
+                        } catch (Throwable t) {
+                            badness[0] = t;
+                            throw new Error(t);
+                        }}}
+                :
+                new Runnable() {
+                    public void run() {
+                        try {
+                            sem.acquireUninterruptibly();
+                        } catch (Throwable t) {
+                            badness[0] = t;
+                            throw new Error(t);
+                        }}};
+
+            Thread b1 = new Thread(blocker);
+            Thread b2 = new Thread(blocker);
+            Runnable signaller = new Runnable() {
+                public void run() {
+                    try {
+                        sem.release();
+                    } catch (Throwable t) {
+                        badness[0] = t;
+                        throw new Error(t);
+                    }}};
+            Thread s1 = new Thread(signaller);
+            Thread s2 = new Thread(signaller);
+            Thread[] threads = { b1, b2, s1, s2 };
+            java.util.Collections.shuffle(java.util.Arrays.asList(threads));
+            for (Thread thread : threads)
+                thread.start();
+            for (Thread thread : threads) {
+                thread.join(60 * 1000);
+                if (thread.isAlive())
+                    throw new Error
+                        (String.format
+                         ("Semaphore stuck: permits %d, thread waiting %s%n",
+                          sem.availablePermits(),
+                          sem.hasQueuedThreads() ? "true" : "false"));
+            }
+            if (badness[0] != null)
+                throw new Error(badness[0]);
+            if (sem.availablePermits() != 0)
+              throw new Error(String.valueOf(sem.availablePermits()));
+            if (sem.hasQueuedThreads())
+              throw new Error(String.valueOf(sem.hasQueuedThreads()));
+            if (sem.getQueueLength() != 0)
+              throw new Error(String.valueOf(sem.getQueueLength()));
+            if (sem.isFair() != fair)
+              throw new Error(String.valueOf(sem.isFair()));
+        }
+    }
+
+    public static void main(String[] args) throws Throwable {
+        for (boolean fair : new boolean[] { true, false })
+            for (boolean interruptibly : new boolean[] { true, false })
+                test(fair, interruptibly);
+    }
+}