changeset 48252:98801bd22f5b

8191937: Lost interrupt in AbstractQueuedSynchronizer when tryAcquire methods throw Reviewed-by: martin, psandoz
author dl
date Sat, 02 Dec 2017 10:03:41 -0800
parents d66e420cc482
children ff597804e8c1
files src/java.base/share/classes/java/util/concurrent/locks/AbstractQueuedLongSynchronizer.java src/java.base/share/classes/java/util/concurrent/locks/AbstractQueuedSynchronizer.java test/jdk/java/util/concurrent/tck/AbstractQueuedLongSynchronizerTest.java test/jdk/java/util/concurrent/tck/AbstractQueuedSynchronizerTest.java
diffstat 4 files changed, 180 insertions(+), 21 deletions(-) [+]
line wrap: on
line diff
--- a/src/java.base/share/classes/java/util/concurrent/locks/AbstractQueuedLongSynchronizer.java	Fri Dec 01 22:04:03 2017 -0800
+++ b/src/java.base/share/classes/java/util/concurrent/locks/AbstractQueuedLongSynchronizer.java	Sat Dec 02 10:03:41 2017 -0800
@@ -422,8 +422,8 @@
      * @return {@code true} if interrupted while waiting
      */
     final boolean acquireQueued(final Node node, long arg) {
+        boolean interrupted = false;
         try {
-            boolean interrupted = false;
             for (;;) {
                 final Node p = node.predecessor();
                 if (p == head && tryAcquire(arg)) {
@@ -431,12 +431,13 @@
                     p.next = null; // help GC
                     return interrupted;
                 }
-                if (shouldParkAfterFailedAcquire(p, node) &&
-                    parkAndCheckInterrupt())
-                    interrupted = true;
+                if (shouldParkAfterFailedAcquire(p, node))
+                    interrupted |= parkAndCheckInterrupt();
             }
         } catch (Throwable t) {
             cancelAcquire(node);
+            if (interrupted)
+                selfInterrupt();
             throw t;
         }
     }
@@ -510,8 +511,8 @@
      */
     private void doAcquireShared(long arg) {
         final Node node = addWaiter(Node.SHARED);
+        boolean interrupted = false;
         try {
-            boolean interrupted = false;
             for (;;) {
                 final Node p = node.predecessor();
                 if (p == head) {
@@ -519,18 +520,18 @@
                     if (r >= 0) {
                         setHeadAndPropagate(node, r);
                         p.next = null; // help GC
-                        if (interrupted)
-                            selfInterrupt();
                         return;
                     }
                 }
-                if (shouldParkAfterFailedAcquire(p, node) &&
-                    parkAndCheckInterrupt())
-                    interrupted = true;
+                if (shouldParkAfterFailedAcquire(p, node))
+                    interrupted |= parkAndCheckInterrupt();
             }
         } catch (Throwable t) {
             cancelAcquire(node);
             throw t;
+        } finally {
+            if (interrupted)
+                selfInterrupt();
         }
     }
 
--- a/src/java.base/share/classes/java/util/concurrent/locks/AbstractQueuedSynchronizer.java	Fri Dec 01 22:04:03 2017 -0800
+++ b/src/java.base/share/classes/java/util/concurrent/locks/AbstractQueuedSynchronizer.java	Sat Dec 02 10:03:41 2017 -0800
@@ -505,7 +505,7 @@
          *
          * @return the predecessor of this node
          */
-        final Node predecessor() throws NullPointerException {
+        final Node predecessor() {
             Node p = prev;
             if (p == null)
                 throw new NullPointerException();
@@ -902,8 +902,8 @@
      * @return {@code true} if interrupted while waiting
      */
     final boolean acquireQueued(final Node node, int arg) {
+        boolean interrupted = false;
         try {
-            boolean interrupted = false;
             for (;;) {
                 final Node p = node.predecessor();
                 if (p == head && tryAcquire(arg)) {
@@ -911,12 +911,13 @@
                     p.next = null; // help GC
                     return interrupted;
                 }
-                if (shouldParkAfterFailedAcquire(p, node) &&
-                    parkAndCheckInterrupt())
-                    interrupted = true;
+                if (shouldParkAfterFailedAcquire(p, node))
+                    interrupted |= parkAndCheckInterrupt();
             }
         } catch (Throwable t) {
             cancelAcquire(node);
+            if (interrupted)
+                selfInterrupt();
             throw t;
         }
     }
@@ -990,8 +991,8 @@
      */
     private void doAcquireShared(int arg) {
         final Node node = addWaiter(Node.SHARED);
+        boolean interrupted = false;
         try {
-            boolean interrupted = false;
             for (;;) {
                 final Node p = node.predecessor();
                 if (p == head) {
@@ -999,18 +1000,18 @@
                     if (r >= 0) {
                         setHeadAndPropagate(node, r);
                         p.next = null; // help GC
-                        if (interrupted)
-                            selfInterrupt();
                         return;
                     }
                 }
-                if (shouldParkAfterFailedAcquire(p, node) &&
-                    parkAndCheckInterrupt())
-                    interrupted = true;
+                if (shouldParkAfterFailedAcquire(p, node))
+                    interrupted |= parkAndCheckInterrupt();
             }
         } catch (Throwable t) {
             cancelAcquire(node);
             throw t;
+        } finally {
+            if (interrupted)
+                selfInterrupt();
         }
     }
 
--- a/test/jdk/java/util/concurrent/tck/AbstractQueuedLongSynchronizerTest.java	Fri Dec 01 22:04:03 2017 -0800
+++ b/test/jdk/java/util/concurrent/tck/AbstractQueuedLongSynchronizerTest.java	Sat Dec 02 10:03:41 2017 -0800
@@ -39,6 +39,7 @@
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.locks.AbstractQueuedLongSynchronizer;
 import java.util.concurrent.locks.AbstractQueuedLongSynchronizer.ConditionObject;
 
@@ -1283,4 +1284,57 @@
         sync.release();
     }
 
+    /**
+     * Tests scenario for
+     * JDK-8191937: Lost interrupt in AbstractQueuedSynchronizer when tryAcquire methods throw
+     */
+    public void testInterruptedFailingAcquire() throws InterruptedException {
+        final RuntimeException ex = new RuntimeException();
+
+        // A synchronizer only offering a choice of failure modes
+        class Sync extends AbstractQueuedLongSynchronizer {
+            boolean pleaseThrow;
+            @Override protected boolean tryAcquire(long ignored) {
+                if (pleaseThrow) throw ex;
+                return false;
+            }
+            @Override protected long tryAcquireShared(long ignored) {
+                if (pleaseThrow) throw ex;
+                return -1;
+            }
+            @Override protected boolean tryRelease(long ignored) {
+                return true;
+            }
+            @Override protected boolean tryReleaseShared(long ignored) {
+                return true;
+            }
+        }
+
+        final Sync s = new Sync();
+
+        final Thread thread = newStartedThread(new CheckedRunnable() {
+            public void realRun() {
+                try {
+                    if (ThreadLocalRandom.current().nextBoolean())
+                        s.acquire(1);
+                    else
+                        s.acquireShared(1);
+                    shouldThrow();
+                } catch (Throwable t) {
+                    assertSame(ex, t);
+                    assertTrue(Thread.interrupted());
+                }
+            }});
+        waitForThreadToEnterWaitState(thread);
+        assertSame(thread, s.getFirstQueuedThread());
+        assertTrue(s.hasQueuedPredecessors());
+        assertTrue(s.hasQueuedThreads());
+        assertEquals(1, s.getQueueLength());
+
+        s.pleaseThrow = true;
+        thread.interrupt();
+        s.release(1);
+        awaitTermination(thread);
+    }
+
 }
--- a/test/jdk/java/util/concurrent/tck/AbstractQueuedSynchronizerTest.java	Fri Dec 01 22:04:03 2017 -0800
+++ b/test/jdk/java/util/concurrent/tck/AbstractQueuedSynchronizerTest.java	Sat Dec 02 10:03:41 2017 -0800
@@ -36,9 +36,11 @@
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.locks.AbstractQueuedSynchronizer;
 import java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject;
 
@@ -1286,4 +1288,105 @@
         sync.release();
     }
 
+    /**
+     * Disabled demo test for (unfixed as of 2017-11)
+     * JDK-8191483: AbstractQueuedSynchronizer cancel/cancel race
+     * ant -Djsr166.tckTestClass=AbstractQueuedSynchronizerTest -Djsr166.methodFilter=testCancelCancelRace -Djsr166.runsPerTest=100 tck
+     */
+    public void DISABLED_testCancelCancelRace() throws InterruptedException {
+        class Sync extends AbstractQueuedSynchronizer {
+            protected boolean tryAcquire(int acquires) {
+                return !hasQueuedPredecessors() && compareAndSetState(0, 1);
+            }
+            protected boolean tryRelease(int releases) {
+                return compareAndSetState(1, 0);
+            }
+        }
+
+        Sync s = new Sync();
+        s.acquire(1);           // acquire to force other threads to enqueue
+
+        // try to trigger double cancel race with two background threads
+        ArrayList<Thread> threads = new ArrayList<>();
+        Runnable failedAcquire = () -> {
+            try {
+                s.acquireInterruptibly(1);
+                shouldThrow();
+            } catch (InterruptedException expected) {}
+        };
+        for (int i = 0; i < 2; i++) {
+            Thread thread = new Thread(failedAcquire);
+            thread.start();
+            threads.add(thread);
+        }
+        Thread.sleep(100);
+        for (Thread thread : threads) thread.interrupt();
+        for (Thread thread : threads) awaitTermination(thread);
+
+        s.release(1);
+
+        // no one holds lock now, we should be able to acquire
+        if (!s.tryAcquire(1))
+            throw new RuntimeException(
+                String.format(
+                    "Broken: hasQueuedPredecessors=%s hasQueuedThreads=%s queueLength=%d firstQueuedThread=%s",
+                    s.hasQueuedPredecessors(),
+                    s.hasQueuedThreads(),
+                    s.getQueueLength(),
+                    s.getFirstQueuedThread()));
+    }
+
+    /**
+     * Tests scenario for
+     * JDK-8191937: Lost interrupt in AbstractQueuedSynchronizer when tryAcquire methods throw
+     */
+    public void testInterruptedFailingAcquire() throws InterruptedException {
+        final RuntimeException ex = new RuntimeException();
+
+        // A synchronizer only offering a choice of failure modes
+        class Sync extends AbstractQueuedSynchronizer {
+            boolean pleaseThrow;
+            @Override protected boolean tryAcquire(int ignored) {
+                if (pleaseThrow) throw ex;
+                return false;
+            }
+            @Override protected int tryAcquireShared(int ignored) {
+                if (pleaseThrow) throw ex;
+                return -1;
+            }
+            @Override protected boolean tryRelease(int ignored) {
+                return true;
+            }
+            @Override protected boolean tryReleaseShared(int ignored) {
+                return true;
+            }
+        }
+
+        final Sync s = new Sync();
+
+        final Thread thread = newStartedThread(new CheckedRunnable() {
+            public void realRun() {
+                try {
+                    if (ThreadLocalRandom.current().nextBoolean())
+                        s.acquire(1);
+                    else
+                        s.acquireShared(1);
+                    shouldThrow();
+                } catch (Throwable t) {
+                    assertSame(ex, t);
+                    assertTrue(Thread.interrupted());
+                }
+            }});
+        waitForThreadToEnterWaitState(thread);
+        assertSame(thread, s.getFirstQueuedThread());
+        assertTrue(s.hasQueuedPredecessors());
+        assertTrue(s.hasQueuedThreads());
+        assertEquals(1, s.getQueueLength());
+
+        s.pleaseThrow = true;
+        thread.interrupt();
+        s.release(1);
+        awaitTermination(thread);
+    }
+
 }