changeset 21981:48b31d370bc9

8028564: Concurrent calls to CHM.put can fail to add the key/value to the map Reviewed-by: psandoz, chegar, alanb Contributed-by: Doug Lea <dl@cs.oswego.edu>
author psandoz
date Thu, 05 Dec 2013 09:44:53 +0100
parents 393509a81cc3
children fd6e5fe509df
files jdk/src/share/classes/java/util/concurrent/ConcurrentHashMap.java jdk/test/java/util/concurrent/ConcurrentHashMap/ConcurrentAssociateTest.java jdk/test/java/util/concurrent/ConcurrentHashMap/ConcurrentContainsKeyTest.java
diffstat 3 files changed, 364 insertions(+), 39 deletions(-) [+]
line wrap: on
line diff
--- a/jdk/src/share/classes/java/util/concurrent/ConcurrentHashMap.java	Thu Dec 05 09:25:31 2013 +0100
+++ b/jdk/src/share/classes/java/util/concurrent/ConcurrentHashMap.java	Thu Dec 05 09:44:53 2013 +0100
@@ -49,7 +49,6 @@
 import java.util.Iterator;
 import java.util.Map;
 import java.util.NoSuchElementException;
-import java.util.Objects;
 import java.util.Set;
 import java.util.Spliterator;
 import java.util.concurrent.ConcurrentMap;
@@ -381,19 +380,21 @@
      * progress.  Resizing proceeds by transferring bins, one by one,
      * from the table to the next table. However, threads claim small
      * blocks of indices to transfer (via field transferIndex) before
-     * doing so, reducing contention.  Because we are using
-     * power-of-two expansion, the elements from each bin must either
-     * stay at same index, or move with a power of two offset. We
-     * eliminate unnecessary node creation by catching cases where old
-     * nodes can be reused because their next fields won't change.  On
-     * average, only about one-sixth of them need cloning when a table
-     * doubles. The nodes they replace will be garbage collectable as
-     * soon as they are no longer referenced by any reader thread that
-     * may be in the midst of concurrently traversing table.  Upon
-     * transfer, the old table bin contains only a special forwarding
-     * node (with hash field "MOVED") that contains the next table as
-     * its key. On encountering a forwarding node, access and update
-     * operations restart, using the new table.
+     * doing so, reducing contention.  A generation stamp in field
+     * sizeCtl ensures that resizings do not overlap. Because we are
+     * using power-of-two expansion, the elements from each bin must
+     * either stay at same index, or move with a power of two
+     * offset. We eliminate unnecessary node creation by catching
+     * cases where old nodes can be reused because their next fields
+     * won't change.  On average, only about one-sixth of them need
+     * cloning when a table doubles. The nodes they replace will be
+     * garbage collectable as soon as they are no longer referenced by
+     * any reader thread that may be in the midst of concurrently
+     * traversing table.  Upon transfer, the old table bin contains
+     * only a special forwarding node (with hash field "MOVED") that
+     * contains the next table as its key. On encountering a
+     * forwarding node, access and update operations restart, using
+     * the new table.
      *
      * Each bin transfer requires its bin lock, which can stall
      * waiting for locks while resizing. However, because other
@@ -570,6 +571,23 @@
      */
     private static final int MIN_TRANSFER_STRIDE = 16;
 
+    /**
+     * The number of bits used for generation stamp in sizeCtl.
+     * Must be at least 6 for 32bit arrays.
+     */
+    private static int RESIZE_STAMP_BITS = 16;
+
+    /**
+     * The maximum number of threads that can help resize.
+     * Must fit in 32 - RESIZE_STAMP_BITS bits.
+     */
+    private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
+
+    /**
+     * The bit shift for recording size stamp in sizeCtl.
+     */
+    private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
+
     /*
      * Encodings for Node hash fields. See above for explanation.
      */
@@ -727,7 +745,7 @@
      * errors by users, these checks must operate on local variables,
      * which accounts for some odd-looking inline assignments below.
      * Note that calls to setTabAt always occur within locked regions,
-     * and so in principle require only release ordering, not need
+     * and so in principle require only release ordering, not
      * full volatile semantics, but are currently coded as volatile
      * writes to be conservative.
      */
@@ -2192,6 +2210,14 @@
     /* ---------------- Table Initialization and Resizing -------------- */
 
     /**
+     * Returns the stamp bits for resizing a table of size n.
+     * Must be negative when shifted left by RESIZE_STAMP_SHIFT.
+     */
+    static final int resizeStamp(int n) {
+        return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
+    }
+
+    /**
      * Initializes table, using the size recorded in sizeCtl.
      */
     private final Node<K,V>[] initTable() {
@@ -2245,17 +2271,20 @@
             s = sumCount();
         }
         if (check >= 0) {
-            Node<K,V>[] tab, nt; int sc;
+            Node<K,V>[] tab, nt; int n, sc;
             while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
-                   tab.length < MAXIMUM_CAPACITY) {
+                   (n = tab.length) < MAXIMUM_CAPACITY) {
+                int rs = resizeStamp(n);
                 if (sc < 0) {
-                    if (sc == -1 || transferIndex <= 0 ||
-                        (nt = nextTable) == null)
+                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
+                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
+                        transferIndex <= 0)
                         break;
-                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc - 1))
+                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                         transfer(tab, nt);
                 }
-                else if (U.compareAndSwapInt(this, SIZECTL, sc, -2))
+                else if (U.compareAndSwapInt(this, SIZECTL, sc,
+                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                     transfer(tab, null);
                 s = sumCount();
             }
@@ -2267,11 +2296,15 @@
      */
     final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
         Node<K,V>[] nextTab; int sc;
-        if ((f instanceof ForwardingNode) &&
+        if (tab != null && (f instanceof ForwardingNode) &&
             (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
-            while (transferIndex > 0 && nextTab == nextTable &&
-                   (sc = sizeCtl) < -1) {
-                if (U.compareAndSwapInt(this, SIZECTL, sc, sc - 1)) {
+            int rs = resizeStamp(tab.length);
+            while (nextTab == nextTable && table == tab &&
+                   (sc = sizeCtl) < 0) {
+                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
+                    sc == rs + MAX_RESIZERS || transferIndex <= 0)
+                    break;
+                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                     transfer(tab, nextTab);
                     break;
                 }
@@ -2309,9 +2342,21 @@
             }
             else if (c <= sc || n >= MAXIMUM_CAPACITY)
                 break;
-            else if (tab == table &&
-                     U.compareAndSwapInt(this, SIZECTL, sc, -2))
-                transfer(tab, null);
+            else if (tab == table) {
+                int rs = resizeStamp(n);
+                if (sc < 0) {
+                    Node<K,V>[] nt;
+                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
+                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
+                        transferIndex <= 0)
+                        break;
+                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
+                        transfer(tab, nt);
+                }
+                else if (U.compareAndSwapInt(this, SIZECTL, sc,
+                                             (rs << RESIZE_STAMP_SHIFT) + 2))
+                    transfer(tab, null);
+            }
         }
     }
 
@@ -2366,8 +2411,8 @@
                     sizeCtl = (n << 1) - (n >>> 1);
                     return;
                 }
-                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, ++sc)) {
-                    if (sc != -1)
+                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
+                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                         return;
                     finishing = advance = true;
                     i = n; // recheck before commit
@@ -2566,11 +2611,8 @@
     private final void treeifyBin(Node<K,V>[] tab, int index) {
         Node<K,V> b; int n, sc;
         if (tab != null) {
-            if ((n = tab.length) < MIN_TREEIFY_CAPACITY) {
-                if (tab == table && (sc = sizeCtl) >= 0 &&
-                    U.compareAndSwapInt(this, SIZECTL, sc, -2))
-                    transfer(tab, null);
-            }
+            if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
+                tryPresize(n << 1);
             else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
                 synchronized (b) {
                     if (tabAt(tab, index) == b) {
@@ -2768,7 +2810,7 @@
         private final void contendedLock() {
             boolean waiting = false;
             for (int s;;) {
-                if (((s = lockState) & WRITER) == 0) {
+                if (((s = lockState) & ~WAITER) == 0) {
                     if (U.compareAndSwapInt(this, LOCKSTATE, s, WRITER)) {
                         if (waiting)
                             waiter = null;
@@ -2793,12 +2835,13 @@
          */
         final Node<K,V> find(int h, Object k) {
             if (k != null) {
-                for (Node<K,V> e = first; e != null; e = e.next) {
+                for (Node<K,V> e = first; e != null; ) {
                     int s; K ek;
                     if (((s = lockState) & (WAITER|WRITER)) != 0) {
                         if (e.hash == h &&
                             ((ek = e.key) == k || (ek != null && k.equals(ek))))
                             return e;
+                        e = e.next;
                     }
                     else if (U.compareAndSwapInt(this, LOCKSTATE, s,
                                                  s + READER)) {
@@ -4454,7 +4497,7 @@
         }
 
         public final boolean removeAll(Collection<?> c) {
-            Objects.requireNonNull(c);
+            if (c == null) throw new NullPointerException();
             boolean modified = false;
             for (Iterator<E> it = iterator(); it.hasNext();) {
                 if (c.contains(it.next())) {
@@ -4466,7 +4509,7 @@
         }
 
         public final boolean retainAll(Collection<?> c) {
-            Objects.requireNonNull(c);
+            if (c == null) throw new NullPointerException();
             boolean modified = false;
             for (Iterator<E> it = iterator(); it.hasNext();) {
                 if (!c.contains(it.next())) {
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/test/java/util/concurrent/ConcurrentHashMap/ConcurrentAssociateTest.java	Thu Dec 05 09:44:53 2013 +0100
@@ -0,0 +1,145 @@
+/*
+ * Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+import org.testng.annotations.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.BiConsumer;
+import java.util.function.Supplier;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+/**
+ * @test
+ * @bug 8028564
+ * @run testng ConcurrentAssociateTest
+ * @summary Test that association operations, such as put and compute,
+ * place entries in the map
+ */
+@Test
+public class ConcurrentAssociateTest {
+
+    // The number of entries for each thread to place in a map
+    private static final int N = Integer.getInteger("n", 128);
+    // The number of iterations of the test
+    private static final int I = Integer.getInteger("i", 256);
+
+    // Object to be placed in the concurrent map
+    static class X {
+        // Limit the hash code to trigger collisions
+        int hc = ThreadLocalRandom.current().nextInt(1, 9);
+
+        public int hashCode() { return hc; }
+    }
+
+    @Test
+    public void testPut() {
+        test("CHM.put", (m, o) -> m.put(o, o));
+    }
+
+    @Test
+    public void testCompute() {
+        test("CHM.compute", (m, o) -> m.compute(o, (k, v) -> o));
+    }
+
+    @Test
+    public void testComputeIfAbsent() {
+        test("CHM.computeIfAbsent", (m, o) -> m.computeIfAbsent(o, (k) -> o));
+    }
+
+    @Test
+    public void testMerge() {
+        test("CHM.merge", (m, o) -> m.merge(o, o, (v1, v2) -> v1));
+    }
+
+    @Test
+    public void testPutAll() {
+        test("CHM.putAll", (m, o) -> {
+            Map<Object, Object> hm = new HashMap<>();
+            hm.put(o, o);
+            m.putAll(hm);
+        });
+    }
+
+    private static void test(String desc, BiConsumer<ConcurrentMap<Object, Object>, Object> associator) {
+        for (int i = 0; i < I; i++) {
+            testOnce(desc, associator);
+        }
+    }
+
+    static class AssociationFailure extends RuntimeException {
+        AssociationFailure(String message) {
+            super(message);
+        }
+    }
+
+    private static void testOnce(String desc, BiConsumer<ConcurrentMap<Object, Object>, Object> associator) {
+        ConcurrentHashMap<Object, Object> m = new ConcurrentHashMap<>();
+        CountDownLatch s = new CountDownLatch(1);
+
+        Supplier<Runnable> sr = () -> () -> {
+            try {
+                s.await();
+            }
+            catch (InterruptedException e) {
+            }
+
+            for (int i = 0; i < N; i++) {
+                Object o = new X();
+                associator.accept(m, o);
+                if (!m.containsKey(o)) {
+                    throw new AssociationFailure(desc + " failed: entry does not exist");
+                }
+            }
+        };
+
+        int ps = Runtime.getRuntime().availableProcessors();
+        Stream<CompletableFuture> runners = IntStream.range(0, ps)
+                .mapToObj(i -> sr.get())
+                .map(CompletableFuture::runAsync);
+
+        CompletableFuture all = CompletableFuture.allOf(
+                runners.toArray(CompletableFuture[]::new));
+
+        // Trigger the runners to start associating
+        s.countDown();
+        try {
+            all.join();
+        } catch (CompletionException e) {
+            Throwable t = e.getCause();
+            if (t instanceof AssociationFailure) {
+                throw (AssociationFailure) t;
+            }
+            else {
+                throw e;
+            }
+        }
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/test/java/util/concurrent/ConcurrentHashMap/ConcurrentContainsKeyTest.java	Thu Dec 05 09:44:53 2013 +0100
@@ -0,0 +1,137 @@
+/*
+ * Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+import org.testng.annotations.Test;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.function.Supplier;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+/**
+ * @test
+ * @bug 8028564
+ * @run testng ConcurrentContainsKeyTest
+ * @summary Test that entries are always present in the map,
+ * when entries are held within one bin that is a tree
+ */
+@Test
+public class ConcurrentContainsKeyTest {
+
+    // The number of entries for each thread to place in a map
+    // Should be > ConcurrentHashMap.TREEIFY_THRESHOLD but small
+    // enough to allow for enough iteration overlap by multiple threads
+    private static final int N = Integer.getInteger("n", 16);
+    // The number of rounds each thread performs per entry
+    private static final int R = Integer.getInteger("r", 32);
+    // The number of iterations of the test
+    private static final int I = Integer.getInteger("i", 256);
+
+    // Object to be placed in the concurrent map
+    static class X implements Comparable<X> {
+
+        private final int a;
+
+        X(int a) {
+            this.a = a;
+        }
+
+        public int compareTo(X o) {
+            return this.a - o.a;
+        }
+
+        public int hashCode() {
+            // Return the same hash code to guarantee collisions
+            return 0;
+        }
+    }
+
+    @Test
+    public void testContainsKey() {
+        X[] content = IntStream.range(0, N).mapToObj(i -> new X(i)).toArray(X[]::new);
+        // Create map with an initial size >= ConcurrentHashMap.TREEIFY_THRESHOLD
+        // ensuring tree'ification will occur for a small number of entries
+        // with the same hash code
+        ConcurrentHashMap<Object, Object> m = new ConcurrentHashMap<>(64);
+        Stream.of(content).forEach(x -> m.put(x, x));
+        test(content, m);
+    }
+
+
+    private static void test(X[] content, ConcurrentHashMap<Object, Object> m) {
+        for (int i = 0; i < I; i++) {
+            testOnce(content, m);
+        }
+    }
+
+    static class AssociationFailure extends RuntimeException {
+        AssociationFailure(String message) {
+            super(message);
+        }
+    }
+
+    private static void testOnce(Object[] content, ConcurrentHashMap<Object, Object> m) {
+        CountDownLatch s = new CountDownLatch(1);
+
+        Supplier<Runnable> sr = () -> () -> {
+            try {
+                s.await();
+            }
+            catch (InterruptedException e) {
+            }
+
+            for (int i = 0; i < R * N; i++) {
+                Object o = content[i % content.length];
+                if (!m.containsKey(o)) {
+                    throw new AssociationFailure("CHM.containsKey failed: entry does not exist");
+                }
+            }
+        };
+
+        int ps = Runtime.getRuntime().availableProcessors();
+        Stream<CompletableFuture> runners = IntStream.range(0, ps)
+                .mapToObj(i -> sr.get())
+                .map(CompletableFuture::runAsync);
+
+        CompletableFuture all = CompletableFuture.allOf(
+                runners.toArray(CompletableFuture[]::new));
+
+        // Trigger the runners to start checking key membership
+        s.countDown();
+        try {
+            all.join();
+        }
+        catch (CompletionException e) {
+            Throwable t = e.getCause();
+            if (t instanceof AssociationFailure) {
+                throw (AssociationFailure) t;
+            }
+            else {
+                throw e;
+            }
+        }
+    }
+}