changeset 7597:c436657091ee

Consolidate Collectors and ConcurrentCollectors; consolidate many groupBy/partitionBy forms into fewer forms; more tests
author briangoetz
date Sun, 10 Mar 2013 17:25:18 -0400
parents fb7b7275d220
children a36b274075f5
files src/share/classes/java/util/stream/Collector.java src/share/classes/java/util/stream/Collectors.java src/share/classes/java/util/stream/ConcurrentCollectors.java test-ng/tests/org/openjdk/tests/java/util/stream/ReduceByOpTest.java test-ng/tests/org/openjdk/tests/java/util/stream/TabulatorsTest.java
diffstat 5 files changed, 454 insertions(+), 350 deletions(-) [+]
line wrap: on
line diff
--- a/src/share/classes/java/util/stream/Collector.java	Fri Mar 08 21:54:17 2013 -0800
+++ b/src/share/classes/java/util/stream/Collector.java	Sun Mar 10 17:25:18 2013 -0400
@@ -52,7 +52,6 @@
  * @see Stream#collect(Collector)
  * @see Stream#collectUnordered(Collector)
  * @see Collectors
- * @see ConcurrentCollectors
  *
  * @param <T> The type of input element to the collect operation
  * @param <R> THe result type of the collect operation
--- a/src/share/classes/java/util/stream/Collectors.java	Fri Mar 08 21:54:17 2013 -0800
+++ b/src/share/classes/java/util/stream/Collectors.java	Sun Mar 10 17:25:18 2013 -0400
@@ -28,6 +28,7 @@
 import java.util.AbstractSet;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -39,6 +40,8 @@
 import java.util.OptionalLong;
 import java.util.Set;
 import java.util.StringJoiner;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.function.BiConsumer;
 import java.util.function.BinaryOperator;
 import java.util.function.DoubleConsumer;
@@ -200,37 +203,37 @@
      * @return A {@code Collection} containing all the input elements, in encounter order
      */
     public static<T, C extends Collection<T>>
-    Collector<T,C> toCollection(Supplier<C> collectionFactory) {
+    Collector<T, C> toCollection(Supplier<C> collectionFactory) {
         return leftCombining(collectionFactory, Collection::add, Collection::addAll);
     }
 
     /**
-     * Accumulate elements into a new {@code List}.
+     * Accumulate elements into a {@code List}.
      *
      * @param <T> The type of the input elements
      * @return A {@code List} containing all the input elements, in encounter order
      */
     public static<T>
-    Collector<T,List<T>> toList() {
+    Collector<T, List<T>> toList() {
         // Consider a tree-based List?
         return toCollection(ArrayList::new);
     }
 
     /**
-     * Accumulate elements into a new {@code Set}.
+     * Accumulate elements into a {@code Set}.
      *
      * @param <T> The type of the input elements
      * @return A {@code Set} containing all the input elements
      */
     public static<T>
-    Collector<T,Set<T>> toSet() {
+    Collector<T, Set<T>> toSet() {
         // @@@ Declare that the collector is NOT_ORDERED so the reduce op can declare NOT_ORDERED in
         //     the terminal op flags
         return toCollection(HashSet::new);
     }
 
     /**
-     * Accumulate {@code String} elements into a new {@link StringBuilder}.
+     * Accumulate {@code String} elements into a {@link StringBuilder}.
      *
      * @return A {@code StringBuilder} containing all of the input elements concatenated in encounter order
      */
@@ -239,7 +242,7 @@
     }
 
     /**
-     * Accumulate {@code String} elements into a new {@link StringJoiner}.
+     * Accumulate {@code String} elements into a {@link StringJoiner}, using the specified separator.
      *
      * @return A {@code StringJoiner} containing all of the input elements concatenated in encounter order
      */
@@ -259,7 +262,19 @@
 
     /**
      * Given a function from {@code T} to {@code U}, adapt a {@code Collector<U,R>} to a
-     * {@code Collector<T,R>} which applies the provided function to each input element.
+     * {@code Collector<T,R>} which operates by applying the provided function to each input
+     * element before the accumulation step.
+     *
+     * @apiNote
+     * The {@code mapping()} collectors are most useful when used in a multi-level collection
+     * following a {@code groupingBy} or {@code partitioningBy} collection.
+     * For example, given a stream of {@code Person}, to accumulate the set of last names in
+     * each city:
+     * <pre>
+     *     Map<City, Set<String>> lastNamesByCity
+     *         = people.stream().collect(groupingBy(Person::getCity)
+     *                                   .then(mapping(Person::getLastName, toSet())));
+     * </pre>
      *
      * @param <T> Type of values to be accepted
      * @param <U> Type of values accepted by provided collector
@@ -300,65 +315,238 @@
                                    downstream.combiner(), downstream.isConcurrent());
     }
 
-    public static <T, K>
-    Collector<T, Map<K, Collection<T>>> groupingBy(Function<? super T, ? extends K> classifier) {
-        return Collectors.groupingBy(classifier, HashMap::new, ArrayList::new);
+
+    public static<T, K> GroupingCollector<T, K> groupingBy(Function<? super T, ? extends K> classifier) {
+        return groupingBy(classifier, HashMap::new);
     }
 
-    public static <T, K, C extends Collection<T>, M extends Map<K, C>>
-    Collector<T, M> groupingBy(Function<? super T, ? extends K> classifier,
-                               Supplier<M> mapFactory,
-                               Supplier<C> rowFactory) {
-        return groupingBy(classifier, mapFactory, toCollection(rowFactory));
+    public static<T, K> GroupingCollector<T, K> groupingBy(Function<? super T, ? extends K> classifier,
+                                                           Supplier<? extends Map> mapFactory) {
+        return new GroupingCollector<T, K>() {
+            @Override
+            public <D> Collector<T, Map<K, D>> then(Collector<T, D> downstream) {
+                Supplier<D> downstreamSupplier = downstream.resultSupplier();
+                BiConsumer<D, T> downstreamAccumulator = downstream.accumulator();
+                BinaryOperator<D> downstreamCombiner = downstream.combiner();
+                BiConsumer<Map<K, D>, T> accumulator = (m, t) -> {
+                    K key = Objects.requireNonNull(classifier.apply(t), "element cannot be mapped to a null key");
+                    downstreamAccumulator.accept(m.computeIfAbsent(key, k -> downstreamSupplier.get()), t);
+                };
+                return new CollectorImpl<>((Supplier<Map<K, D>>) mapFactory, accumulator,
+                                           Collectors.<K, D, Map<K, D>>leftMapMerger(downstreamCombiner));
+            }
+
+            @Override
+            public Collector<T, Map<K, T>> thenReducing(BinaryOperator<T> reducer) {
+                return thenReducing(Functions.identity(), reducer);
+            }
+
+            @Override
+            public <U> Collector<T, Map<K, U>> thenReducing(Function<? super T, ? extends U> mapper,
+                                                            BinaryOperator<U> reducer) {
+                BiConsumer<Map<K, U>, T> accumulator = (map, value) -> {
+                    map.merge(Objects.requireNonNull(classifier.apply(value), "element cannot be mapped to a null key"),
+                              mapper.apply(value), reducer);
+                };
+                return new CollectorImpl<>((Supplier<Map<K, U>>) mapFactory, accumulator,
+                                           Collectors.<K, U, Map<K, U>>leftMapMerger(reducer));
+            }
+
+            @Override
+            public Supplier<Map<K, Collection<T>>> resultSupplier() {
+                return (Supplier<Map<K, Collection<T>>>) mapFactory;
+            }
+
+            @Override
+            public BiConsumer<Map<K, Collection<T>>, T> accumulator() {
+                return (map, value) -> map.computeIfAbsent(classifier.apply(value), k -> new ArrayList<T>()).add(value);
+            }
+
+            @Override
+            public BinaryOperator<Map<K, Collection<T>>> combiner() {
+                return Collectors.<K, Collection<T>, Map<K, Collection<T>>>leftMapMerger((left, right) -> {
+                    left.addAll(right);
+                    return left;
+                });
+            }
+        };
     }
 
-    public static <T, K, D> Collector<T, Map<K, D>> groupingBy(Function<? super T, ? extends K> classifier,
-                                                               Collector<T, D> downstream) {
-        return groupingBy(classifier, HashMap::new, downstream);
+    public static<T, K> GroupingCollector<T, K> groupingByConcurrent(Function<? super T, ? extends K> classifier) {
+        return groupingByConcurrent(classifier, ConcurrentHashMap::new);
     }
 
-    public static <T, K, D, M extends Map<K, D>> Collector<T, M> groupingBy(Function<? super T, ? extends K> classifier,
-                                                                            Supplier<M> mapFactory,
-                                                                            Collector<T, D> downstream) {
-        Supplier<D> downstreamSupplier = downstream.resultSupplier();
-        BiConsumer<D, T> downstreamAccumulator = downstream.accumulator();
-        BinaryOperator<D> downstreamCombiner = downstream.combiner();
-        BiConsumer<M, T> accumulator = (m, t) -> {
-            K key = Objects.requireNonNull(classifier.apply(t), "element cannot be mapped to a null key");
-            downstreamAccumulator.accept(m.computeIfAbsent(key, k -> downstreamSupplier.get()), t);
+    public static<T, K> GroupingCollector<T, K> groupingByConcurrent(Function<? super T, ? extends K> classifier,
+                                                                     Supplier<? extends ConcurrentMap> mapFactory) {
+        return new GroupingCollector<T, K>() {
+            @Override
+            public <D> Collector<T, Map<K, D>> then(Collector<T, D> downstream) {
+                Supplier<D> downstreamSupplier = downstream.resultSupplier();
+                BiConsumer<D, T> downstreamAccumulator = downstream.accumulator();
+                BiConsumer<D, T> wrappedAccumulator
+                        = downstream.isConcurrent()
+                          ? downstreamAccumulator
+                          : (d, t) -> { synchronized(d) { downstreamAccumulator.accept(d, t); } };
+                BiConsumer<Map<K, D>, T> accumulator = (m, t) -> {
+                    K key = Objects.requireNonNull(classifier.apply(t), "element cannot be mapped to a null key");
+                    wrappedAccumulator.accept(m.computeIfAbsent(key, k -> downstreamSupplier.get()), t);
+                };
+                return new Collectors.CollectorImpl<>((Supplier) mapFactory, accumulator,
+                                                      leftMapMerger(downstream.combiner()), true);
+            }
+
+            @Override
+            public Collector<T, Map<K, T>> thenReducing(BinaryOperator<T> reducer) {
+                return thenReducing(Functions.identity(), reducer);
+            }
+
+            @Override
+            public <U> Collector<T, Map<K, U>> thenReducing(Function<? super T, ? extends U> mapper,
+                                                            BinaryOperator<U> reducer) {
+                BiConsumer<Map<K, U>, T> accumulator
+                        = (map, value) -> map.merge(classifier.apply(value), mapper.apply(value), reducer);
+                return new Collectors.CollectorImpl<>((Supplier) mapFactory, accumulator, leftMapMerger(reducer), true);
+            }
+
+            @Override
+            public Supplier<Map<K, Collection<T>>> resultSupplier() {
+                return (Supplier) mapFactory;
+            }
+
+            @Override
+            public BiConsumer<Map<K, Collection<T>>, T> accumulator() {
+                return (map, value) -> map.computeIfAbsent(classifier.apply(value), k -> new ArrayList<T>()).add(value);
+            }
+
+            @Override
+            public BinaryOperator<Map<K, Collection<T>>> combiner() {
+                return Collectors.<K, Collection<T>, Map<K, Collection<T>>>leftMapMerger((left, right) -> {
+                    left.addAll(right);
+                    return left;
+                });
+            }
         };
-        return new CollectorImpl<>(mapFactory, accumulator, Collectors.<K, D, M>leftMapMerger(downstreamCombiner));
     }
 
-    public static <T, K> Collector<T, Map<K,T>> groupingReduce(Function<? super T, ? extends K> classifier,
-                                                               BinaryOperator<T> reducer) {
-        return groupingReduce(classifier, HashMap::new, Functions.identity(), reducer);
+    public static<T> PartitioningCollector<T> partitioningBy(Predicate<? super T> predicate) {
+        return new PartitioningCollector<T>() {
+            @Override
+            public <D> Collector<T, Map<Boolean, D>> then(Collector<T, D> downstream) {
+                BiConsumer<D, T> downstreamAccumulator = downstream.accumulator();
+                BiConsumer<Map<Boolean, D>, T> accumulator = (result, t) -> {
+                    Partition<D> asPartition = ((Partition<D>) result);
+                    downstreamAccumulator.accept(predicate.test(t) ? asPartition.forTrue : asPartition.forFalse, t);
+                };
+                return new CollectorImpl<>(() -> new Partition<>(downstream.resultSupplier().get(),
+                                                                 downstream.resultSupplier().get()),
+                                           accumulator, leftPartitionMerger(downstream.combiner()));
+            }
+
+            @Override
+            public Collector<T, Map<Boolean, T>> thenReducing(T identity, BinaryOperator<T> reducer) {
+                return thenReducing(identity, Functions.identity(), reducer);
+            }
+
+            @Override
+            public <U> Collector<T, Map<Boolean, U>> thenReducing(U identity,
+                                                                  Function<? super T, ? extends U> mapper,
+                                                                  BinaryOperator<U> reducer) {
+                BiConsumer<Map<Boolean, U>, T> accumulator = (result, t) -> {
+                    Partition<U> asPartition = ((Partition<U>) result);
+                    if (predicate.test(t))
+                        asPartition.forTrue = reducer.apply(asPartition.forTrue, mapper.apply(t));
+                    else
+                        asPartition.forFalse = reducer.apply(asPartition.forFalse, mapper.apply(t));
+                };
+                return new CollectorImpl<>(() -> new Partition<>(identity, identity),
+                                           accumulator, leftPartitionMerger(reducer));
+            }
+
+            @Override
+            public Supplier<Map<Boolean, Collection<T>>> resultSupplier() {
+                return () -> new Partition<Collection<T>>(new ArrayList<>(), new ArrayList<>());
+            }
+
+            @Override
+            public BiConsumer<Map<Boolean, Collection<T>>, T> accumulator() {
+                return (result, t) -> {
+                    Partition<Collection<T>> asPartition = ((Partition<Collection<T>>) result);
+                    (predicate.test(t) ? asPartition.forTrue : asPartition.forFalse).add(t);
+                };
+            }
+
+            @Override
+            public BinaryOperator<Map<Boolean, Collection<T>>> combiner() {
+                return leftPartitionMerger((Collection<T> left, Collection<T> right) -> { left.addAll(right); return left; });
+            }
+        };
     }
 
-    public static <T, K, M extends Map<K, T>> Collector<T, M>
-    groupingReduce(Function<? super T, ? extends K> classifier,
-                   Supplier<M> mapFactory,
-                   BinaryOperator<T> reducer) {
-        return groupingReduce(classifier, mapFactory, Functions.identity(), reducer);
-    }
+    public static<T> PartitioningCollector<T> partitioningByConcurrent(Predicate<? super T> predicate) {
+        return new PartitioningCollector<T>() {
+            @Override
+            public <D> Collector<T, Map<Boolean, D>> then(Collector<T, D> downstream) {
+                BiConsumer<D, T> downstreamAccumulator = downstream.accumulator();
+                BiConsumer<D, T> wrappedAccumulator
+                        = downstream.isConcurrent()
+                          ? downstreamAccumulator
+                          : (d, t) -> { synchronized(d) { downstreamAccumulator.accept(d, t); } };
+                BiConsumer<Map<Boolean, D>, T> accumulator = (result, t) -> {
+                    Collectors.Partition<D> asPartition = ((Collectors.Partition<D>) result);
+                    wrappedAccumulator.accept(predicate.test(t) ? asPartition.forTrue : asPartition.forFalse, t);
+                };
+                return new Collectors.CollectorImpl<>(() -> new Collectors.Partition<>(downstream.resultSupplier().get(),
+                                                                                       downstream.resultSupplier().get()),
+                                                      accumulator, Collectors.leftPartitionMerger(downstream.combiner()), true);
+            }
 
-    public static <T, K, D> Collector<T, Map<K,D>>
-    groupingReduce(Function<? super T, ? extends K> classifier,
-                   Function<? super T, ? extends D> mapper,
-                   BinaryOperator<D> reducer) {
-        return groupingReduce(classifier, HashMap::new, mapper, reducer);
-    }
+            @Override
+            public Collector<T, Map<Boolean, T>> thenReducing(T identity, BinaryOperator<T> reducer) {
+                return thenReducing(identity, Functions.identity(), reducer);
+            }
 
-    public static <T, K, D, M extends Map<K, D>> Collector<T, M>
-    groupingReduce(Function<? super T, ? extends K> classifier,
-                   Supplier<M> mapFactory,
-                   Function<? super T, ? extends D> mapper,
-                   BinaryOperator<D> reducer) {
-        BiConsumer<M, T> accumulator = (map, value) -> {
-            map.merge(Objects.requireNonNull(classifier.apply(value), "element cannot be mapped to a null key"),
-                      mapper.apply(value), reducer);
+            @Override
+            public <U> Collector<T, Map<Boolean, U>> thenReducing(U identity,
+                                                                  Function<? super T, ? extends U> mapper,
+                                                                  BinaryOperator<U> reducer) {
+                final Object trueLock = new Object();
+                final Object falseLock = new Object();
+                BiConsumer<Map<Boolean, U>, T> accumulator = (result, t) -> {
+                    Collectors.Partition<U> asPartition = ((Collectors.Partition<U>) result);
+                    if (predicate.test(t)) {
+                        synchronized (trueLock) {
+                            asPartition.forTrue = reducer.apply(asPartition.forTrue, mapper.apply(t));
+                        }
+                    }
+                    else {
+                        synchronized (falseLock) {
+                            asPartition.forFalse = reducer.apply(asPartition.forFalse, mapper.apply(t));
+                        }
+                    }
+                };
+                return new Collectors.CollectorImpl<>(() -> new Collectors.Partition<>(identity, identity),
+                                                      accumulator, Collectors.leftPartitionMerger(reducer), true);
+            }
+
+            @Override
+            public Supplier<Map<Boolean, Collection<T>>> resultSupplier() {
+                return () -> new Partition<Collection<T>>(Collections.synchronizedList(new ArrayList<>()),
+                                                          Collections.synchronizedList(new ArrayList<>()));
+            }
+
+            @Override
+            public BiConsumer<Map<Boolean, Collection<T>>, T> accumulator() {
+                return (result, t) -> {
+                    Partition<Collection<T>> asPartition = ((Partition<Collection<T>>) result);
+                    (predicate.test(t) ? asPartition.forTrue : asPartition.forFalse).add(t);
+                };
+            }
+
+            @Override
+            public BinaryOperator<Map<Boolean, Collection<T>>> combiner() {
+                return leftPartitionMerger((Collection<T> left, Collection<T> right) -> { left.addAll(right); return left; });
+            }
         };
-        return new CollectorImpl<>(mapFactory, accumulator, Collectors.<K, D, M>leftMapMerger(reducer));
     }
 
     public static <T, U> Collector<T, Map<T,U>> toMap(Function<? super T, ? extends U> mapper) {
@@ -372,6 +560,18 @@
         return new CollectorImpl<>(mapSupplier, accumulator, leftMapMerger(mergeFunction));
     }
 
+    public static <T, U> Collector<T, ConcurrentMap<T,U>> toConcurrentMap(Function<? super T, ? extends U> mapper) {
+        return toConcurrentMap(mapper, ConcurrentHashMap::new, throwingMerger());
+    }
+
+    public static <T, U, M extends ConcurrentMap<T, U>>
+    Collector<T, M> toConcurrentMap(Function<? super T, ? extends U> mapper,
+                                    Supplier<M> mapSupplier,
+                                    BinaryOperator<U> mergeFunction) {
+        BiConsumer<M, T> accumulator = (map, value) -> map.merge(value, mapper.apply(value), mergeFunction);
+        return new Collectors.CollectorImpl<>(mapSupplier, accumulator, Collectors.leftMapMerger(mergeFunction), true);
+    }
+
     static final class Partition<T> extends AbstractMap<Boolean, T> implements Map<Boolean, T> {
         T forTrue;
         T forFalse;
@@ -435,16 +635,6 @@
         }
     }
 
-    public static<T> Collector<T, Map<Boolean, Collection<T>>> partitioningBy(Predicate<T> predicate) {
-        return partitioningBy(predicate, ArrayList::new);
-    }
-
-    public static<T, C extends Collection<T>> Collector<T, Map<Boolean, C>>
-    partitioningBy(Predicate<T> predicate,
-                   Supplier<C> rowFactory) {
-        return partitioningBy(predicate, toCollection(rowFactory));
-    }
-
     static<D> BinaryOperator<Map<Boolean, D>> leftPartitionMerger(BinaryOperator<D> op) {
         return (m1, m2) -> {
             Partition<D> left = (Partition<D>) m1;
@@ -455,42 +645,6 @@
         };
     }
 
-    public static<T, D> Collector<T, Map<Boolean, D>>
-    partitioningBy(Predicate<T> predicate,
-                   Collector<T, D> downstream) {
-        BiConsumer<D, T> downstreamAccumulator = downstream.accumulator();
-        BiConsumer<Map<Boolean, D>, T> accumulator = (result, t) -> {
-            Partition<D> asPartition = ((Partition<D>) result);
-            downstreamAccumulator.accept(predicate.test(t) ? asPartition.forTrue : asPartition.forFalse, t);
-        };
-        return new CollectorImpl<>(() -> new Partition<>(downstream.resultSupplier().get(),
-                                                         downstream.resultSupplier().get()),
-                                   accumulator, leftPartitionMerger(downstream.combiner()));
-    }
-
-    public static<T> Collector<T, Map<Boolean, T>>
-    partitioningReduce(Predicate<T> predicate,
-                       T identity,
-                       BinaryOperator<T> reducer) {
-        return partitioningReduce(predicate, identity, Functions.identity(), reducer);
-    }
-
-    public static<T, U> Collector<T, Map<Boolean, U>>
-    partitioningReduce(Predicate<T> predicate,
-                       U identity,
-                       Function<T, U> mapper,
-                       BinaryOperator<U> reducer) {
-        BiConsumer<Map<Boolean, U>, T> accumulator = (result, t) -> {
-            Partition<U> asPartition = ((Partition<U>) result);
-            if (predicate.test(t))
-                asPartition.forTrue = reducer.apply(asPartition.forTrue, mapper.apply(t));
-            else
-                asPartition.forFalse = reducer.apply(asPartition.forFalse, mapper.apply(t));
-        };
-        return new CollectorImpl<>(() -> new Partition<>(identity, identity),
-                                   accumulator, leftPartitionMerger(reducer));
-    }
-
     public static final class LongStatistics implements LongConsumer, IntConsumer {
         private long count;
         private long sum;
@@ -609,4 +763,31 @@
         return new DoubleCollectorImpl<>(DoubleStatistics::new, DoubleStatistics::accept,
                                          (l, r) -> { l.combine(r); return l; });
     }
+
+    /**
+     * GroupingCollector
+     */
+    public static interface GroupingCollector<T, K> extends Collector<T, Map<K, Collection<T>>> {
+
+        <D> Collector<T, Map<K, D>> then(Collector<T, D> downstream);
+
+        Collector<T, Map<K, T>> thenReducing(BinaryOperator<T> reducer);
+
+        <U> Collector<T, Map<K, U>> thenReducing(Function<? super T, ? extends U> mapper,
+                                                 BinaryOperator<U> reducer);
+    }
+
+    /**
+     * PartitioningCollector
+     */
+    public static interface PartitioningCollector<T> extends Collector<T, Map<Boolean, Collection<T>>> {
+
+        <D> Collector<T, Map<Boolean, D>> then(Collector<T, D> downstream);
+
+        Collector<T, Map<Boolean, T>> thenReducing(T identity, BinaryOperator<T> reducer);
+
+        <U> Collector<T, Map<Boolean, U>> thenReducing(U identity,
+                                                       Function<? super T, ? extends U> mapper,
+                                                       BinaryOperator<U> reducer);
+    }
 }
--- a/src/share/classes/java/util/stream/ConcurrentCollectors.java	Fri Mar 08 21:54:17 2013 -0800
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,188 +0,0 @@
-/*
- * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.  Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-package java.util.stream;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.function.BiConsumer;
-import java.util.function.BinaryOperator;
-import java.util.function.Function;
-import java.util.function.Functions;
-import java.util.function.Predicate;
-import java.util.function.Supplier;
-
-/**
- * ConcurrentReducers
- *
- * @since 1.8
- */
-public final class ConcurrentCollectors {
-
-    private ConcurrentCollectors() {}
-
-    public static <T, K>
-    Collector<T, ConcurrentMap<K, Collection<T>>> groupingBy(Function<? super T, ? extends K> classifier) {
-        return groupingBy(classifier, ConcurrentHashMap::new, ArrayList::new);
-    }
-
-    public static <T, K, C extends Collection<T>, M extends ConcurrentMap<K, C>>
-    Collector<T, M> groupingBy(Function<? super T, ? extends K> classifier,
-                               Supplier<M> mapFactory,
-                               Supplier<C> rowFactory) {
-        return groupingBy(classifier, mapFactory, Collectors.toCollection(rowFactory));
-    }
-
-    public static <T, K, D> Collector<T, ConcurrentMap<K, D>> groupingBy(Function<? super T, ? extends K> classifier,
-                                                                         Collector<T, D> downstream) {
-        return groupingBy(classifier, (Supplier<ConcurrentMap<K, D>>) ConcurrentHashMap::new, downstream);
-    }
-
-    static <T, K, D, M extends ConcurrentMap<K, D>> Collector<T, M> groupingBy(Function<? super T, ? extends K> classifier,
-                                                                               Supplier<M> mapFactory,
-                                                                               Collector<T, D> downstream) {
-        Supplier<D> downstreamSupplier = downstream.resultSupplier();
-        BiConsumer<D, T> downstreamAccumulator = downstream.accumulator();
-        BiConsumer<D, T> wrappedAccumulator
-                = downstream.isConcurrent()
-                  ? downstreamAccumulator
-                  : (d, t) -> { synchronized(d) { downstreamAccumulator.accept(d, t); } };
-        BiConsumer<M, T> accumulator = (m, t) -> {
-            K key = Objects.requireNonNull(classifier.apply(t), "element cannot be mapped to a null key");
-            wrappedAccumulator.accept(m.computeIfAbsent(key, k -> downstreamSupplier.get()), t);
-        };
-        return new Collectors.CollectorImpl<>(mapFactory, accumulator,
-                                              Collectors.leftMapMerger(downstream.combiner()), true);
-    }
-
-    public static <T, K> Collector<T, ConcurrentMap<K,T>> groupingReduce(Function<? super T, ? extends K> classifier,
-                                                                         BinaryOperator<T> reducer) {
-        return groupingReduce(classifier, ConcurrentHashMap::new, Functions.identity(), reducer);
-    }
-
-    public static <T, K, M extends ConcurrentMap<K, T>>
-    Collector<T, M> groupingReduce(Function<? super T, ? extends K> classifier,
-                                   Supplier<M> mapFactory,
-                                   BinaryOperator<T> reducer) {
-        return groupingReduce(classifier, mapFactory, Functions.identity(), reducer);
-    }
-
-    public static <T, K, D> Collector<T, ConcurrentMap<K,D>> groupingReduce(Function<? super T, ? extends K> classifier,
-                                                                            Function<? super T, ? extends D> mapper,
-                                                                            BinaryOperator<D> reducer) {
-        return groupingReduce(classifier, (Supplier<ConcurrentMap<K, D>>) ConcurrentHashMap::new, mapper, reducer);
-    }
-
-    public static <T, K, D, M extends ConcurrentMap<K, D>>
-    Collector<T, M> groupingReduce(Function<? super T, ? extends K> classifier,
-                                   Supplier<M> mapFactory,
-                                   Function<? super T, ? extends D> mapper,
-                                   BinaryOperator<D> reducer) {
-        BiConsumer<M, T> accumulator = (map, value) -> map.merge(classifier.apply(value), mapper.apply(value), reducer);
-        return new Collectors.CollectorImpl<>(mapFactory, accumulator, Collectors.leftMapMerger(reducer), true);
-    }
-
-
-    public static <T, U> Collector<T, ConcurrentMap<T,U>> joiningWith(Function<? super T, ? extends U> mapper) {
-        return joiningWith(mapper, Collectors.throwingMerger());
-    }
-
-    public static <T, U> Collector<T, ConcurrentMap<T,U>> joiningWith(Function<? super T, ? extends U> mapper,
-                                                                      BinaryOperator<U> mergeFunction) {
-        return joiningWith(mapper, mergeFunction, ConcurrentHashMap::new);
-    }
-
-    public static <T, U, M extends ConcurrentMap<T, U>> Collector<T, M> joiningWith(Function<? super T, ? extends U> mapper,
-                                                                                    Supplier<M> mapSupplier) {
-        return joiningWith(mapper, Collectors.throwingMerger(), mapSupplier);
-    }
-
-    public static <T, U, M extends ConcurrentMap<T, U>> Collector<T, M> joiningWith(Function<? super T, ? extends U> mapper,
-                                                                                    BinaryOperator<U> mergeFunction,
-                                                                                    Supplier<M> mapSupplier) {
-        BiConsumer<M, T> accumulator = (map, value) -> map.merge(value, mapper.apply(value), mergeFunction);
-        return new Collectors.CollectorImpl<>(mapSupplier, accumulator, Collectors.leftMapMerger(mergeFunction), true);
-    }
-
-
-    public static<T> Collector<T, Map<Boolean, Collection<T>>> partitioningBy(Predicate<T> predicate) {
-        return partitioningBy(predicate, ArrayList::new);
-    }
-
-    public static<T, C extends Collection<T>> Collector<T, Map<Boolean, C>>
-    partitioningBy(Predicate<T> predicate,
-                   Supplier<C> rowFactory) {
-        return partitioningBy(predicate, Collectors.toCollection(rowFactory));
-    }
-
-    public static<T, D> Collector<T, Map<Boolean, D>>
-    partitioningBy(Predicate<T> predicate,
-                   Collector<T, D> downstream) {
-        BiConsumer<D, T> downstreamAccumulator = downstream.accumulator();
-        BiConsumer<D, T> wrappedAccumulator
-                = downstream.isConcurrent()
-                  ? downstreamAccumulator
-                  : (d, t) -> { synchronized(d) { downstreamAccumulator.accept(d, t); } };
-        BiConsumer<Map<Boolean, D>, T> accumulator = (result, t) -> {
-            Collectors.Partition<D> asPartition = ((Collectors.Partition<D>) result);
-            wrappedAccumulator.accept(predicate.test(t) ? asPartition.forTrue : asPartition.forFalse, t);
-        };
-        return new Collectors.CollectorImpl<>(() -> new Collectors.Partition<>(downstream.resultSupplier().get(),
-                                                                               downstream.resultSupplier().get()),
-                                              accumulator, Collectors.leftPartitionMerger(downstream.combiner()), true);
-    }
-
-    public static<T> Collector<T, Map<Boolean, T>> partitioningReduce(Predicate<T> predicate,
-                                                                      T identity,
-                                                                      BinaryOperator<T> reducer) {
-        return partitioningReduce(predicate, identity, Functions.identity(), reducer);
-    }
-
-    public static<T, U> Collector<T, Map<Boolean, U>> partitioningReduce(Predicate<T> predicate,
-                                                                         U identity,
-                                                                         Function<T, U> mapper,
-                                                                         BinaryOperator<U> reducer) {
-        final Object trueLock = new Object();
-        final Object falseLock = new Object();
-        BiConsumer<Map<Boolean, U>, T> accumulator = (result, t) -> {
-            Collectors.Partition<U> asPartition = ((Collectors.Partition<U>) result);
-            if (predicate.test(t)) {
-                synchronized (trueLock) {
-                    asPartition.forTrue = reducer.apply(asPartition.forTrue, mapper.apply(t));
-                }
-            }
-            else {
-                synchronized (falseLock) {
-                    asPartition.forFalse = reducer.apply(asPartition.forFalse, mapper.apply(t));
-                }
-            }
-        };
-        return new Collectors.CollectorImpl<>(() -> new Collectors.Partition<>(identity, identity),
-                                              accumulator, Collectors.leftPartitionMerger(reducer), true);
-    }
-}
--- a/test-ng/tests/org/openjdk/tests/java/util/stream/ReduceByOpTest.java	Fri Mar 08 21:54:17 2013 -0800
+++ b/test-ng/tests/org/openjdk/tests/java/util/stream/ReduceByOpTest.java	Sun Mar 10 17:25:18 2013 -0400
@@ -50,10 +50,8 @@
     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
     public void testOps(String name, StreamTestData<Integer> data) {
         Map<Boolean,Collection<Integer>> gbResult = data.stream().collect(Collectors.groupingBy(Functions.forPredicate(pEven, true, false)));
-        Collector<Integer, Map<Boolean, Integer>> collector = Collectors.groupingReduce(Functions.forPredicate(pEven, true, false),
-                                                                                        HashMap::new,
-                                                                                        Functions.identity(), rPlus);
-        Map<Boolean,Integer> result = data.stream().collect(collector);
+        Collector<Integer, Map<Boolean, Integer>> collector = Collectors.groupingBy(Functions.forPredicate(pEven, true, false), HashMap::new).thenReducing(rPlus);
+        Map<Boolean, Integer> result = data.stream().collect(collector);
         assertEquals(result.size(), gbResult.size());
         for (Map.Entry<Boolean, Integer> entry : result.entrySet()) {
             Boolean key = entry.getKey();
@@ -63,7 +61,7 @@
         int uniqueSize = data.into(new HashSet<Integer>()).size();
         Map<Integer, Collection<Integer>> mgResult = exerciseTerminalOps(data,
                                                                          s -> s.collect(Collectors.groupingBy(mId)));
-        Collector<Integer, Map<Integer, Integer>> collector2 = Collectors.groupingReduce(mId, HashMap::new, e -> 1, Integer::sum);
+        Collector<Integer, Map<Integer, Integer>> collector2 = Collectors.groupingBy(mId, HashMap::new).thenReducing(e -> 1, Integer::sum);
         Map<Integer, Integer> miResult = exerciseTerminalOps(data, s -> s.collect(collector2));
         assertEquals(miResult.keySet().size(), uniqueSize);
         for (Map.Entry<Integer, Integer> entry : miResult.entrySet())
--- a/test-ng/tests/org/openjdk/tests/java/util/stream/TabulatorsTest.java	Fri Mar 08 21:54:17 2013 -0800
+++ b/test-ng/tests/org/openjdk/tests/java/util/stream/TabulatorsTest.java	Sun Mar 10 17:25:18 2013 -0400
@@ -30,9 +30,12 @@
 import java.util.HashSet;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.function.BinaryOperator;
 import java.util.function.Function;
 import java.util.function.Functions;
+import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.stream.Collector;
 import java.util.stream.Collectors;
@@ -45,7 +48,8 @@
 import org.testng.annotations.Test;
 
 import static java.util.stream.Collectors.groupingBy;
-import static java.util.stream.Collectors.groupingReduce;
+import static java.util.stream.Collectors.groupingByConcurrent;
+import static java.util.stream.Collectors.toCollection;
 import static java.util.stream.LambdaTestHelpers.assertContents;
 import static java.util.stream.LambdaTestHelpers.assertContentsUnordered;
 import static java.util.stream.LambdaTestHelpers.mDoubler;
@@ -56,15 +60,12 @@
  * @author Brian Goetz
  */
 public class TabulatorsTest extends OpTestCase {
-    // @@@ Test plan
-    // There are 8 versions of groupBy:
-    //   groupingBy: { supplier, not } x { downstream collect, collect-to-list }
-    //   groupingReduce: { supplier, not } x { reduce, map-reduce }
-    // There are 5 versions of partition:
-    //   partition(predicate)
-    //   partition(predicate, collectionSupplier)
-    //   partition(predicate, { mutableReduce, reduce, mapReduce })
-    // There are 4 versions of mappedTo:
+    // There are 4 versions of groupingBy:
+    //   groupingBy: { supplier, not } x { concurrent, not }
+    //   each version has raw, cascaded, and reducing forms
+    // There are 2 versions of partition (concurrent and not)
+    //   each version has raw, cascaded, and reducing forms
+    // There are 4 versions of mappedTo
     //   mappedTo(function, mapSupplier?, mergeFunction?)
     // Each variety needs at least one test
     // Plus a variety of multi-level tests (groupBy(..., partition), partition(..., groupBy))
@@ -73,7 +74,9 @@
 
 
     private static abstract class TabulationAssertion<T, U> {
-        abstract void assertValue(U value, Supplier<Stream<T>> source) throws ReflectiveOperationException;
+        abstract void assertValue(U value,
+                                  Supplier<Stream<T>> source,
+                                  boolean ordered) throws ReflectiveOperationException;
     }
 
     @SuppressWarnings({"rawtypes", "unchecked"})
@@ -90,16 +93,45 @@
             this.downstream = downstream;
         }
 
-        void assertValue(Map<K, V> map, Supplier<Stream<T>> source) throws ReflectiveOperationException {
-            assert clazz.isAssignableFrom(map.getClass());
+        void assertValue(Map<K, V> map,
+                         Supplier<Stream<T>> source,
+                         boolean ordered) throws ReflectiveOperationException {
+            if (!clazz.isAssignableFrom(map.getClass()))
+                fail(String.format("Class mismatch in GroupedMapAssertion: %s, %s", clazz, map.getClass()));
             assertContentsUnordered(map.keySet(), source.get().map(classifier).collect(Collectors.toSet()));
             for (Map.Entry<K,V> entry : map.entrySet()) {
                 K key = entry.getKey();
-                downstream.assertValue(entry.getValue(), () -> source.get().filter(e -> classifier.apply(e).equals(key)));
+                downstream.assertValue(entry.getValue(),
+                                       () -> source.get().filter(e -> classifier.apply(e).equals(key)),
+                                       ordered);
             }
         }
     }
 
+    static class PartitionAssertion<T> extends TabulationAssertion<T, Map<Boolean,T>> {
+        private final Class<? extends Map> clazz;
+        private final Predicate<T> predicate;
+        private final TabulationAssertion<T,T> downstream;
+
+        protected PartitionAssertion(Predicate<T> predicate,
+                                     Class<? extends Map> clazz,
+                                     TabulationAssertion<T, T> downstream) {
+            this.clazz = clazz;
+            this.predicate = predicate;
+            this.downstream = downstream;
+        }
+
+        void assertValue(Map<Boolean, T> map,
+                         Supplier<Stream<T>> source,
+                         boolean ordered) throws ReflectiveOperationException {
+            if (!clazz.isAssignableFrom(map.getClass()))
+                fail(String.format("Class mismatch in GroupedMapAssertion: %s, %s", clazz, map.getClass()));
+            assertEquals(2, map.size());
+            downstream.assertValue(map.get(true), () -> source.get().filter(predicate), ordered);
+            downstream.assertValue(map.get(false), () -> source.get().filter(predicate.negate()), ordered);
+        }
+    }
+
     @SuppressWarnings({"rawtypes", "unchecked"})
     static class CollectionAssertion<T> extends TabulationAssertion<T, Collection<T>> {
         private final Class<? extends Collection> clazz;
@@ -111,12 +143,13 @@
         }
 
         @Override
-        void assertValue(Collection<T> value, Supplier<Stream<T>> source) throws ReflectiveOperationException {
-            assert clazz.isAssignableFrom(value.getClass());
+        void assertValue(Collection<T> value, Supplier<Stream<T>> source, boolean ordered) throws ReflectiveOperationException {
+            if (!clazz.isAssignableFrom(value.getClass()))
+                fail(String.format("Class mismatch in CollectionAssertion: %s, %s", clazz, value.getClass()));
             Stream<T> stream = source.get();
             Collection<T> result = clazz.newInstance();
             stream.forEach(result::add);
-            if (StreamOpFlagTestHelper.isStreamOrdered(stream) && targetOrdered)
+            if (StreamOpFlagTestHelper.isStreamOrdered(stream) && targetOrdered && ordered)
                 assertContents(value, result);
             else
                 assertContentsUnordered(value, result);
@@ -133,33 +166,58 @@
         }
 
         @Override
-        void assertValue(U value, Supplier<Stream<T>> source) throws ReflectiveOperationException {
+        void assertValue(U value, Supplier<Stream<T>> source, boolean ordered) throws ReflectiveOperationException {
             assertEquals(value, source.get().map(mapper).reduce(reducer).get());
         }
     }
 
-    private<T, R> void exerciseTabulation(StreamTestData<Integer> data,
-                                          Collector<Integer, R> collector,
-                                          TabulationAssertion<Integer, R> assertion) throws ReflectiveOperationException {
+    private<T, R extends Map> void exerciseMapTabulation(StreamTestData<Integer> data,
+                                                         Collector<Integer, R> collector,
+                                                         TabulationAssertion<Integer, R> assertion) throws ReflectiveOperationException {
         R r = exerciseTerminalOps(data, s -> s.collect(collector));
-        assertion.assertValue(r, () -> data.stream());
+        assertion.assertValue(r, () -> data.stream(), true);
+        r = withData(data)
+                .terminal(s -> s.collectUnordered(collector))
+                .parallelEqualityAsserter(this::nestedMapEqualityAssertion)
+                .exercise();
+        assertion.assertValue(r, () -> data.stream(), false);
+    }
+
+    private void nestedMapEqualityAssertion(Object o1, Object o2) {
+        if (o1 instanceof Map) {
+            Map m1 = (Map) o1;
+            Map m2 = (Map) o2;
+            assertContentsUnordered(m1.keySet(), m2.keySet());
+            for (Object k : m1.keySet())
+                nestedMapEqualityAssertion(m1.get(k), m2.get(k));
+        }
+        else if (o1 instanceof Collection) {
+            assertContentsUnordered(((Collection) o1), ((Collection) o2));
+        }
+        else
+            assertEquals(o1, o2);
     }
 
     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
-    public void testGroupBy(String name, StreamTestData<Integer> data) throws ReflectiveOperationException {
+    public void testSimpleGroupBy(String name, StreamTestData<Integer> data) throws ReflectiveOperationException {
         Function<Integer, Integer> classifier = i -> i % 3;
 
         // Single-level groupBy
-        exerciseTabulation(data,
-                           groupingBy(classifier),
-                           new GroupedMapAssertion<>(classifier, HashMap.class,
-                                                     new CollectionAssertion<Integer>(ArrayList.class, true)));
+        exerciseMapTabulation(data, groupingBy(classifier), new GroupedMapAssertion<>(classifier, HashMap.class,
+                                                                                      new CollectionAssertion<Integer>(ArrayList.class, true)));
+        exerciseMapTabulation(data, groupingByConcurrent(classifier), new GroupedMapAssertion<>(classifier, ConcurrentHashMap.class,
+                                                                                                new CollectionAssertion<Integer>(ArrayList.class, true)));
 
         // With explicit constructors
-        exerciseTabulation(data,
-                           groupingBy(classifier, TreeMap::new, HashSet::new),
-                           new GroupedMapAssertion<>(classifier, TreeMap.class,
-                                                     new CollectionAssertion<Integer>(HashSet.class, false)));
+        exerciseMapTabulation(data,
+                              groupingBy(classifier, TreeMap::new).then(toCollection((Supplier<Collection>) HashSet::new)),
+                              new GroupedMapAssertion<>(classifier, TreeMap.class,
+                                                        new CollectionAssertion<Integer>(HashSet.class, false)));
+        exerciseMapTabulation(data,
+                              groupingByConcurrent(classifier, ConcurrentSkipListMap::new)
+                                      .then(toCollection((Supplier<Collection>) HashSet::new)),
+                              new GroupedMapAssertion<>(classifier, ConcurrentSkipListMap.class,
+                                                        new CollectionAssertion<Integer>(HashSet.class, false)));
     }
 
     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
@@ -168,18 +226,54 @@
         Function<Integer, Integer> classifier2 = i -> i % 23;
 
         // Two-level groupBy
-        exerciseTabulation(data,
-                           groupingBy(classifier, groupingBy(classifier2)),
-                           new GroupedMapAssertion<>(classifier, HashMap.class,
-                                                     new GroupedMapAssertion<>(classifier2, HashMap.class,
-                                                                               new CollectionAssertion<Integer>(ArrayList.class, true))));
+        exerciseMapTabulation(data,
+                              groupingBy(classifier).then(groupingBy(classifier2)),
+                              new GroupedMapAssertion<>(classifier, HashMap.class,
+                                                        new GroupedMapAssertion<>(classifier2, HashMap.class,
+                                                                                  new CollectionAssertion<Integer>(ArrayList.class, true))));
+        // with concurrent as upstream
+        exerciseMapTabulation(data,
+                              groupingByConcurrent(classifier).then(groupingBy(classifier2)),
+                              new GroupedMapAssertion<>(classifier, ConcurrentHashMap.class,
+                                                        new GroupedMapAssertion<>(classifier2, HashMap.class,
+                                                                                  new CollectionAssertion<Integer>(ArrayList.class, true))));
+        // with concurrent as downstream
+        exerciseMapTabulation(data,
+                              groupingBy(classifier).then(groupingByConcurrent(classifier2)),
+                              new GroupedMapAssertion<>(classifier, HashMap.class,
+                                                        new GroupedMapAssertion<>(classifier2, ConcurrentHashMap.class,
+                                                                                  new CollectionAssertion<Integer>(ArrayList.class, true))));
+        // with concurrent as upstream and downstream
+        exerciseMapTabulation(data,
+                              groupingByConcurrent(classifier).then(groupingByConcurrent(classifier2)),
+                              new GroupedMapAssertion<>(classifier, ConcurrentHashMap.class,
+                                                        new GroupedMapAssertion<>(classifier2, ConcurrentHashMap.class,
+                                                                                  new CollectionAssertion<Integer>(ArrayList.class, true))));
 
         // With explicit constructors
-        exerciseTabulation(data,
-                           groupingBy(classifier, TreeMap::new, groupingBy(classifier2, TreeMap::new, HashSet::new)),
-                           new GroupedMapAssertion<>(classifier, TreeMap.class,
-                                                     new GroupedMapAssertion<>(classifier2, TreeMap.class,
-                                                                               new CollectionAssertion<Integer>(HashSet.class, false))));
+        exerciseMapTabulation(data,
+                              groupingBy(classifier, TreeMap::new).then(groupingBy(classifier2, TreeMap::new).then(toCollection((Supplier<Collection>) HashSet::new))),
+                              new GroupedMapAssertion<>(classifier, TreeMap.class,
+                                                        new GroupedMapAssertion<>(classifier2, TreeMap.class,
+                                                                                  new CollectionAssertion<Integer>(HashSet.class, false))));
+        // with concurrent as upstream
+        exerciseMapTabulation(data,
+                              groupingByConcurrent(classifier, ConcurrentSkipListMap::new).then(groupingBy(classifier2, TreeMap::new)),
+                              new GroupedMapAssertion<>(classifier, ConcurrentSkipListMap.class,
+                                                        new GroupedMapAssertion<>(classifier2, TreeMap.class,
+                                                                                  new CollectionAssertion<Integer>(ArrayList.class, true))));
+        // with concurrent as downstream
+        exerciseMapTabulation(data,
+                              groupingBy(classifier, TreeMap::new).then(groupingByConcurrent(classifier2, ConcurrentSkipListMap::new)),
+                              new GroupedMapAssertion<>(classifier, TreeMap.class,
+                                                        new GroupedMapAssertion<>(classifier2, ConcurrentSkipListMap.class,
+                                                                                  new CollectionAssertion<Integer>(ArrayList.class, true))));
+        // with concurrent as upstream and downstream
+        exerciseMapTabulation(data,
+                              groupingByConcurrent(classifier, ConcurrentSkipListMap::new).then(groupingByConcurrent(classifier2, ConcurrentSkipListMap::new)),
+                              new GroupedMapAssertion<>(classifier, ConcurrentSkipListMap.class,
+                                                        new GroupedMapAssertion<>(classifier2, ConcurrentSkipListMap.class,
+                                                                                  new CollectionAssertion<Integer>(ArrayList.class, true))));
     }
 
     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
@@ -187,27 +281,47 @@
         Function<Integer, Integer> classifier = i -> i % 3;
 
         // Single-level simple reduce
-        exerciseTabulation(data,
-                           groupingReduce(classifier, Integer::sum),
-                           new GroupedMapAssertion<>(classifier, HashMap.class,
-                                                     new ReduceAssertion<>(Functions.identity(), Integer::sum)));
+        exerciseMapTabulation(data,
+                              groupingBy(classifier).thenReducing(Integer::sum),
+                              new GroupedMapAssertion<>(classifier, HashMap.class,
+                                                        new ReduceAssertion<>(Functions.identity(), Integer::sum)));
+        // with concurrent
+        exerciseMapTabulation(data,
+                              groupingByConcurrent(classifier).thenReducing(Integer::sum),
+                              new GroupedMapAssertion<>(classifier, ConcurrentHashMap.class,
+                                                        new ReduceAssertion<>(Functions.identity(), Integer::sum)));
 
         // With explicit constructors
-        exerciseTabulation(data,
-                           groupingReduce(classifier, () -> new TreeMap<>(), Integer::sum),
-                           new GroupedMapAssertion<>(classifier, TreeMap.class,
-                                                     new ReduceAssertion<>(Functions.identity(), Integer::sum)));
+        exerciseMapTabulation(data,
+                              groupingBy(classifier, TreeMap::new).thenReducing(Integer::sum),
+                              new GroupedMapAssertion<>(classifier, TreeMap.class,
+                                                        new ReduceAssertion<>(Functions.identity(), Integer::sum)));
+        // with concurrent
+        exerciseMapTabulation(data,
+                              groupingByConcurrent(classifier, ConcurrentSkipListMap::new).thenReducing(Integer::sum),
+                              new GroupedMapAssertion<>(classifier, ConcurrentSkipListMap.class,
+                                                        new ReduceAssertion<>(Functions.identity(), Integer::sum)));
 
         // Single-level map-reduce
-        exerciseTabulation(data,
-                           groupingReduce(classifier, mDoubler, Integer::sum),
-                           new GroupedMapAssertion<>(classifier, HashMap.class,
-                                                     new ReduceAssertion<>(mDoubler, Integer::sum)));
+        exerciseMapTabulation(data,
+                              groupingBy(classifier).thenReducing(mDoubler, Integer::sum),
+                              new GroupedMapAssertion<>(classifier, HashMap.class,
+                                                        new ReduceAssertion<>(mDoubler, Integer::sum)));
+        // with concurrent
+        exerciseMapTabulation(data,
+                              groupingByConcurrent(classifier).thenReducing(mDoubler, Integer::sum),
+                              new GroupedMapAssertion<>(classifier, ConcurrentHashMap.class,
+                                                        new ReduceAssertion<>(mDoubler, Integer::sum)));
 
         // With explicit constructors
-        exerciseTabulation(data,
-                           groupingReduce(classifier, TreeMap::new, mDoubler, Integer::sum),
-                           new GroupedMapAssertion<>(classifier, TreeMap.class,
-                                                     new ReduceAssertion<>(mDoubler, Integer::sum)));
+        exerciseMapTabulation(data,
+                              groupingBy(classifier, TreeMap::new).thenReducing(mDoubler, Integer::sum),
+                              new GroupedMapAssertion<>(classifier, TreeMap.class,
+                                                        new ReduceAssertion<>(mDoubler, Integer::sum)));
+        // with concurrent
+        exerciseMapTabulation(data,
+                              groupingByConcurrent(classifier, ConcurrentSkipListMap::new).thenReducing(mDoubler, Integer::sum),
+                              new GroupedMapAssertion<>(classifier, ConcurrentSkipListMap.class,
+                                                        new ReduceAssertion<>(mDoubler, Integer::sum)));
     }
 }