OpenJDK / lambda / lambda / jdk
changeset 7597:c436657091ee
Consolidate Collectors and ConcurrentCollectors; consolidate many groupBy/partitionBy forms into fewer forms; more tests
author | briangoetz |
---|---|
date | Sun, 10 Mar 2013 17:25:18 -0400 |
parents | fb7b7275d220 |
children | a36b274075f5 |
files | src/share/classes/java/util/stream/Collector.java src/share/classes/java/util/stream/Collectors.java src/share/classes/java/util/stream/ConcurrentCollectors.java test-ng/tests/org/openjdk/tests/java/util/stream/ReduceByOpTest.java test-ng/tests/org/openjdk/tests/java/util/stream/TabulatorsTest.java |
diffstat | 5 files changed, 454 insertions(+), 350 deletions(-) [+] |
line wrap: on
line diff
--- a/src/share/classes/java/util/stream/Collector.java Fri Mar 08 21:54:17 2013 -0800 +++ b/src/share/classes/java/util/stream/Collector.java Sun Mar 10 17:25:18 2013 -0400 @@ -52,7 +52,6 @@ * @see Stream#collect(Collector) * @see Stream#collectUnordered(Collector) * @see Collectors - * @see ConcurrentCollectors * * @param <T> The type of input element to the collect operation * @param <R> THe result type of the collect operation
--- a/src/share/classes/java/util/stream/Collectors.java Fri Mar 08 21:54:17 2013 -0800 +++ b/src/share/classes/java/util/stream/Collectors.java Sun Mar 10 17:25:18 2013 -0400 @@ -28,6 +28,7 @@ import java.util.AbstractSet; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -39,6 +40,8 @@ import java.util.OptionalLong; import java.util.Set; import java.util.StringJoiner; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.function.BiConsumer; import java.util.function.BinaryOperator; import java.util.function.DoubleConsumer; @@ -200,37 +203,37 @@ * @return A {@code Collection} containing all the input elements, in encounter order */ public static<T, C extends Collection<T>> - Collector<T,C> toCollection(Supplier<C> collectionFactory) { + Collector<T, C> toCollection(Supplier<C> collectionFactory) { return leftCombining(collectionFactory, Collection::add, Collection::addAll); } /** - * Accumulate elements into a new {@code List}. + * Accumulate elements into a {@code List}. * * @param <T> The type of the input elements * @return A {@code List} containing all the input elements, in encounter order */ public static<T> - Collector<T,List<T>> toList() { + Collector<T, List<T>> toList() { // Consider a tree-based List? return toCollection(ArrayList::new); } /** - * Accumulate elements into a new {@code Set}. + * Accumulate elements into a {@code Set}. * * @param <T> The type of the input elements * @return A {@code Set} containing all the input elements */ public static<T> - Collector<T,Set<T>> toSet() { + Collector<T, Set<T>> toSet() { // @@@ Declare that the collector is NOT_ORDERED so the reduce op can declare NOT_ORDERED in // the terminal op flags return toCollection(HashSet::new); } /** - * Accumulate {@code String} elements into a new {@link StringBuilder}. + * Accumulate {@code String} elements into a {@link StringBuilder}. * * @return A {@code StringBuilder} containing all of the input elements concatenated in encounter order */ @@ -239,7 +242,7 @@ } /** - * Accumulate {@code String} elements into a new {@link StringJoiner}. + * Accumulate {@code String} elements into a {@link StringJoiner}, using the specified separator. * * @return A {@code StringJoiner} containing all of the input elements concatenated in encounter order */ @@ -259,7 +262,19 @@ /** * Given a function from {@code T} to {@code U}, adapt a {@code Collector<U,R>} to a - * {@code Collector<T,R>} which applies the provided function to each input element. + * {@code Collector<T,R>} which operates by applying the provided function to each input + * element before the accumulation step. + * + * @apiNote + * The {@code mapping()} collectors are most useful when used in a multi-level collection + * following a {@code groupingBy} or {@code partitioningBy} collection. + * For example, given a stream of {@code Person}, to accumulate the set of last names in + * each city: + * <pre> + * Map<City, Set<String>> lastNamesByCity + * = people.stream().collect(groupingBy(Person::getCity) + * .then(mapping(Person::getLastName, toSet()))); + * </pre> * * @param <T> Type of values to be accepted * @param <U> Type of values accepted by provided collector @@ -300,65 +315,238 @@ downstream.combiner(), downstream.isConcurrent()); } - public static <T, K> - Collector<T, Map<K, Collection<T>>> groupingBy(Function<? super T, ? extends K> classifier) { - return Collectors.groupingBy(classifier, HashMap::new, ArrayList::new); + + public static<T, K> GroupingCollector<T, K> groupingBy(Function<? super T, ? extends K> classifier) { + return groupingBy(classifier, HashMap::new); } - public static <T, K, C extends Collection<T>, M extends Map<K, C>> - Collector<T, M> groupingBy(Function<? super T, ? extends K> classifier, - Supplier<M> mapFactory, - Supplier<C> rowFactory) { - return groupingBy(classifier, mapFactory, toCollection(rowFactory)); + public static<T, K> GroupingCollector<T, K> groupingBy(Function<? super T, ? extends K> classifier, + Supplier<? extends Map> mapFactory) { + return new GroupingCollector<T, K>() { + @Override + public <D> Collector<T, Map<K, D>> then(Collector<T, D> downstream) { + Supplier<D> downstreamSupplier = downstream.resultSupplier(); + BiConsumer<D, T> downstreamAccumulator = downstream.accumulator(); + BinaryOperator<D> downstreamCombiner = downstream.combiner(); + BiConsumer<Map<K, D>, T> accumulator = (m, t) -> { + K key = Objects.requireNonNull(classifier.apply(t), "element cannot be mapped to a null key"); + downstreamAccumulator.accept(m.computeIfAbsent(key, k -> downstreamSupplier.get()), t); + }; + return new CollectorImpl<>((Supplier<Map<K, D>>) mapFactory, accumulator, + Collectors.<K, D, Map<K, D>>leftMapMerger(downstreamCombiner)); + } + + @Override + public Collector<T, Map<K, T>> thenReducing(BinaryOperator<T> reducer) { + return thenReducing(Functions.identity(), reducer); + } + + @Override + public <U> Collector<T, Map<K, U>> thenReducing(Function<? super T, ? extends U> mapper, + BinaryOperator<U> reducer) { + BiConsumer<Map<K, U>, T> accumulator = (map, value) -> { + map.merge(Objects.requireNonNull(classifier.apply(value), "element cannot be mapped to a null key"), + mapper.apply(value), reducer); + }; + return new CollectorImpl<>((Supplier<Map<K, U>>) mapFactory, accumulator, + Collectors.<K, U, Map<K, U>>leftMapMerger(reducer)); + } + + @Override + public Supplier<Map<K, Collection<T>>> resultSupplier() { + return (Supplier<Map<K, Collection<T>>>) mapFactory; + } + + @Override + public BiConsumer<Map<K, Collection<T>>, T> accumulator() { + return (map, value) -> map.computeIfAbsent(classifier.apply(value), k -> new ArrayList<T>()).add(value); + } + + @Override + public BinaryOperator<Map<K, Collection<T>>> combiner() { + return Collectors.<K, Collection<T>, Map<K, Collection<T>>>leftMapMerger((left, right) -> { + left.addAll(right); + return left; + }); + } + }; } - public static <T, K, D> Collector<T, Map<K, D>> groupingBy(Function<? super T, ? extends K> classifier, - Collector<T, D> downstream) { - return groupingBy(classifier, HashMap::new, downstream); + public static<T, K> GroupingCollector<T, K> groupingByConcurrent(Function<? super T, ? extends K> classifier) { + return groupingByConcurrent(classifier, ConcurrentHashMap::new); } - public static <T, K, D, M extends Map<K, D>> Collector<T, M> groupingBy(Function<? super T, ? extends K> classifier, - Supplier<M> mapFactory, - Collector<T, D> downstream) { - Supplier<D> downstreamSupplier = downstream.resultSupplier(); - BiConsumer<D, T> downstreamAccumulator = downstream.accumulator(); - BinaryOperator<D> downstreamCombiner = downstream.combiner(); - BiConsumer<M, T> accumulator = (m, t) -> { - K key = Objects.requireNonNull(classifier.apply(t), "element cannot be mapped to a null key"); - downstreamAccumulator.accept(m.computeIfAbsent(key, k -> downstreamSupplier.get()), t); + public static<T, K> GroupingCollector<T, K> groupingByConcurrent(Function<? super T, ? extends K> classifier, + Supplier<? extends ConcurrentMap> mapFactory) { + return new GroupingCollector<T, K>() { + @Override + public <D> Collector<T, Map<K, D>> then(Collector<T, D> downstream) { + Supplier<D> downstreamSupplier = downstream.resultSupplier(); + BiConsumer<D, T> downstreamAccumulator = downstream.accumulator(); + BiConsumer<D, T> wrappedAccumulator + = downstream.isConcurrent() + ? downstreamAccumulator + : (d, t) -> { synchronized(d) { downstreamAccumulator.accept(d, t); } }; + BiConsumer<Map<K, D>, T> accumulator = (m, t) -> { + K key = Objects.requireNonNull(classifier.apply(t), "element cannot be mapped to a null key"); + wrappedAccumulator.accept(m.computeIfAbsent(key, k -> downstreamSupplier.get()), t); + }; + return new Collectors.CollectorImpl<>((Supplier) mapFactory, accumulator, + leftMapMerger(downstream.combiner()), true); + } + + @Override + public Collector<T, Map<K, T>> thenReducing(BinaryOperator<T> reducer) { + return thenReducing(Functions.identity(), reducer); + } + + @Override + public <U> Collector<T, Map<K, U>> thenReducing(Function<? super T, ? extends U> mapper, + BinaryOperator<U> reducer) { + BiConsumer<Map<K, U>, T> accumulator + = (map, value) -> map.merge(classifier.apply(value), mapper.apply(value), reducer); + return new Collectors.CollectorImpl<>((Supplier) mapFactory, accumulator, leftMapMerger(reducer), true); + } + + @Override + public Supplier<Map<K, Collection<T>>> resultSupplier() { + return (Supplier) mapFactory; + } + + @Override + public BiConsumer<Map<K, Collection<T>>, T> accumulator() { + return (map, value) -> map.computeIfAbsent(classifier.apply(value), k -> new ArrayList<T>()).add(value); + } + + @Override + public BinaryOperator<Map<K, Collection<T>>> combiner() { + return Collectors.<K, Collection<T>, Map<K, Collection<T>>>leftMapMerger((left, right) -> { + left.addAll(right); + return left; + }); + } }; - return new CollectorImpl<>(mapFactory, accumulator, Collectors.<K, D, M>leftMapMerger(downstreamCombiner)); } - public static <T, K> Collector<T, Map<K,T>> groupingReduce(Function<? super T, ? extends K> classifier, - BinaryOperator<T> reducer) { - return groupingReduce(classifier, HashMap::new, Functions.identity(), reducer); + public static<T> PartitioningCollector<T> partitioningBy(Predicate<? super T> predicate) { + return new PartitioningCollector<T>() { + @Override + public <D> Collector<T, Map<Boolean, D>> then(Collector<T, D> downstream) { + BiConsumer<D, T> downstreamAccumulator = downstream.accumulator(); + BiConsumer<Map<Boolean, D>, T> accumulator = (result, t) -> { + Partition<D> asPartition = ((Partition<D>) result); + downstreamAccumulator.accept(predicate.test(t) ? asPartition.forTrue : asPartition.forFalse, t); + }; + return new CollectorImpl<>(() -> new Partition<>(downstream.resultSupplier().get(), + downstream.resultSupplier().get()), + accumulator, leftPartitionMerger(downstream.combiner())); + } + + @Override + public Collector<T, Map<Boolean, T>> thenReducing(T identity, BinaryOperator<T> reducer) { + return thenReducing(identity, Functions.identity(), reducer); + } + + @Override + public <U> Collector<T, Map<Boolean, U>> thenReducing(U identity, + Function<? super T, ? extends U> mapper, + BinaryOperator<U> reducer) { + BiConsumer<Map<Boolean, U>, T> accumulator = (result, t) -> { + Partition<U> asPartition = ((Partition<U>) result); + if (predicate.test(t)) + asPartition.forTrue = reducer.apply(asPartition.forTrue, mapper.apply(t)); + else + asPartition.forFalse = reducer.apply(asPartition.forFalse, mapper.apply(t)); + }; + return new CollectorImpl<>(() -> new Partition<>(identity, identity), + accumulator, leftPartitionMerger(reducer)); + } + + @Override + public Supplier<Map<Boolean, Collection<T>>> resultSupplier() { + return () -> new Partition<Collection<T>>(new ArrayList<>(), new ArrayList<>()); + } + + @Override + public BiConsumer<Map<Boolean, Collection<T>>, T> accumulator() { + return (result, t) -> { + Partition<Collection<T>> asPartition = ((Partition<Collection<T>>) result); + (predicate.test(t) ? asPartition.forTrue : asPartition.forFalse).add(t); + }; + } + + @Override + public BinaryOperator<Map<Boolean, Collection<T>>> combiner() { + return leftPartitionMerger((Collection<T> left, Collection<T> right) -> { left.addAll(right); return left; }); + } + }; } - public static <T, K, M extends Map<K, T>> Collector<T, M> - groupingReduce(Function<? super T, ? extends K> classifier, - Supplier<M> mapFactory, - BinaryOperator<T> reducer) { - return groupingReduce(classifier, mapFactory, Functions.identity(), reducer); - } + public static<T> PartitioningCollector<T> partitioningByConcurrent(Predicate<? super T> predicate) { + return new PartitioningCollector<T>() { + @Override + public <D> Collector<T, Map<Boolean, D>> then(Collector<T, D> downstream) { + BiConsumer<D, T> downstreamAccumulator = downstream.accumulator(); + BiConsumer<D, T> wrappedAccumulator + = downstream.isConcurrent() + ? downstreamAccumulator + : (d, t) -> { synchronized(d) { downstreamAccumulator.accept(d, t); } }; + BiConsumer<Map<Boolean, D>, T> accumulator = (result, t) -> { + Collectors.Partition<D> asPartition = ((Collectors.Partition<D>) result); + wrappedAccumulator.accept(predicate.test(t) ? asPartition.forTrue : asPartition.forFalse, t); + }; + return new Collectors.CollectorImpl<>(() -> new Collectors.Partition<>(downstream.resultSupplier().get(), + downstream.resultSupplier().get()), + accumulator, Collectors.leftPartitionMerger(downstream.combiner()), true); + } - public static <T, K, D> Collector<T, Map<K,D>> - groupingReduce(Function<? super T, ? extends K> classifier, - Function<? super T, ? extends D> mapper, - BinaryOperator<D> reducer) { - return groupingReduce(classifier, HashMap::new, mapper, reducer); - } + @Override + public Collector<T, Map<Boolean, T>> thenReducing(T identity, BinaryOperator<T> reducer) { + return thenReducing(identity, Functions.identity(), reducer); + } - public static <T, K, D, M extends Map<K, D>> Collector<T, M> - groupingReduce(Function<? super T, ? extends K> classifier, - Supplier<M> mapFactory, - Function<? super T, ? extends D> mapper, - BinaryOperator<D> reducer) { - BiConsumer<M, T> accumulator = (map, value) -> { - map.merge(Objects.requireNonNull(classifier.apply(value), "element cannot be mapped to a null key"), - mapper.apply(value), reducer); + @Override + public <U> Collector<T, Map<Boolean, U>> thenReducing(U identity, + Function<? super T, ? extends U> mapper, + BinaryOperator<U> reducer) { + final Object trueLock = new Object(); + final Object falseLock = new Object(); + BiConsumer<Map<Boolean, U>, T> accumulator = (result, t) -> { + Collectors.Partition<U> asPartition = ((Collectors.Partition<U>) result); + if (predicate.test(t)) { + synchronized (trueLock) { + asPartition.forTrue = reducer.apply(asPartition.forTrue, mapper.apply(t)); + } + } + else { + synchronized (falseLock) { + asPartition.forFalse = reducer.apply(asPartition.forFalse, mapper.apply(t)); + } + } + }; + return new Collectors.CollectorImpl<>(() -> new Collectors.Partition<>(identity, identity), + accumulator, Collectors.leftPartitionMerger(reducer), true); + } + + @Override + public Supplier<Map<Boolean, Collection<T>>> resultSupplier() { + return () -> new Partition<Collection<T>>(Collections.synchronizedList(new ArrayList<>()), + Collections.synchronizedList(new ArrayList<>())); + } + + @Override + public BiConsumer<Map<Boolean, Collection<T>>, T> accumulator() { + return (result, t) -> { + Partition<Collection<T>> asPartition = ((Partition<Collection<T>>) result); + (predicate.test(t) ? asPartition.forTrue : asPartition.forFalse).add(t); + }; + } + + @Override + public BinaryOperator<Map<Boolean, Collection<T>>> combiner() { + return leftPartitionMerger((Collection<T> left, Collection<T> right) -> { left.addAll(right); return left; }); + } }; - return new CollectorImpl<>(mapFactory, accumulator, Collectors.<K, D, M>leftMapMerger(reducer)); } public static <T, U> Collector<T, Map<T,U>> toMap(Function<? super T, ? extends U> mapper) { @@ -372,6 +560,18 @@ return new CollectorImpl<>(mapSupplier, accumulator, leftMapMerger(mergeFunction)); } + public static <T, U> Collector<T, ConcurrentMap<T,U>> toConcurrentMap(Function<? super T, ? extends U> mapper) { + return toConcurrentMap(mapper, ConcurrentHashMap::new, throwingMerger()); + } + + public static <T, U, M extends ConcurrentMap<T, U>> + Collector<T, M> toConcurrentMap(Function<? super T, ? extends U> mapper, + Supplier<M> mapSupplier, + BinaryOperator<U> mergeFunction) { + BiConsumer<M, T> accumulator = (map, value) -> map.merge(value, mapper.apply(value), mergeFunction); + return new Collectors.CollectorImpl<>(mapSupplier, accumulator, Collectors.leftMapMerger(mergeFunction), true); + } + static final class Partition<T> extends AbstractMap<Boolean, T> implements Map<Boolean, T> { T forTrue; T forFalse; @@ -435,16 +635,6 @@ } } - public static<T> Collector<T, Map<Boolean, Collection<T>>> partitioningBy(Predicate<T> predicate) { - return partitioningBy(predicate, ArrayList::new); - } - - public static<T, C extends Collection<T>> Collector<T, Map<Boolean, C>> - partitioningBy(Predicate<T> predicate, - Supplier<C> rowFactory) { - return partitioningBy(predicate, toCollection(rowFactory)); - } - static<D> BinaryOperator<Map<Boolean, D>> leftPartitionMerger(BinaryOperator<D> op) { return (m1, m2) -> { Partition<D> left = (Partition<D>) m1; @@ -455,42 +645,6 @@ }; } - public static<T, D> Collector<T, Map<Boolean, D>> - partitioningBy(Predicate<T> predicate, - Collector<T, D> downstream) { - BiConsumer<D, T> downstreamAccumulator = downstream.accumulator(); - BiConsumer<Map<Boolean, D>, T> accumulator = (result, t) -> { - Partition<D> asPartition = ((Partition<D>) result); - downstreamAccumulator.accept(predicate.test(t) ? asPartition.forTrue : asPartition.forFalse, t); - }; - return new CollectorImpl<>(() -> new Partition<>(downstream.resultSupplier().get(), - downstream.resultSupplier().get()), - accumulator, leftPartitionMerger(downstream.combiner())); - } - - public static<T> Collector<T, Map<Boolean, T>> - partitioningReduce(Predicate<T> predicate, - T identity, - BinaryOperator<T> reducer) { - return partitioningReduce(predicate, identity, Functions.identity(), reducer); - } - - public static<T, U> Collector<T, Map<Boolean, U>> - partitioningReduce(Predicate<T> predicate, - U identity, - Function<T, U> mapper, - BinaryOperator<U> reducer) { - BiConsumer<Map<Boolean, U>, T> accumulator = (result, t) -> { - Partition<U> asPartition = ((Partition<U>) result); - if (predicate.test(t)) - asPartition.forTrue = reducer.apply(asPartition.forTrue, mapper.apply(t)); - else - asPartition.forFalse = reducer.apply(asPartition.forFalse, mapper.apply(t)); - }; - return new CollectorImpl<>(() -> new Partition<>(identity, identity), - accumulator, leftPartitionMerger(reducer)); - } - public static final class LongStatistics implements LongConsumer, IntConsumer { private long count; private long sum; @@ -609,4 +763,31 @@ return new DoubleCollectorImpl<>(DoubleStatistics::new, DoubleStatistics::accept, (l, r) -> { l.combine(r); return l; }); } + + /** + * GroupingCollector + */ + public static interface GroupingCollector<T, K> extends Collector<T, Map<K, Collection<T>>> { + + <D> Collector<T, Map<K, D>> then(Collector<T, D> downstream); + + Collector<T, Map<K, T>> thenReducing(BinaryOperator<T> reducer); + + <U> Collector<T, Map<K, U>> thenReducing(Function<? super T, ? extends U> mapper, + BinaryOperator<U> reducer); + } + + /** + * PartitioningCollector + */ + public static interface PartitioningCollector<T> extends Collector<T, Map<Boolean, Collection<T>>> { + + <D> Collector<T, Map<Boolean, D>> then(Collector<T, D> downstream); + + Collector<T, Map<Boolean, T>> thenReducing(T identity, BinaryOperator<T> reducer); + + <U> Collector<T, Map<Boolean, U>> thenReducing(U identity, + Function<? super T, ? extends U> mapper, + BinaryOperator<U> reducer); + } }
--- a/src/share/classes/java/util/stream/ConcurrentCollectors.java Fri Mar 08 21:54:17 2013 -0800 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,188 +0,0 @@ -/* - * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. Oracle designates this - * particular file as subject to the "Classpath" exception as provided - * by Oracle in the LICENSE file that accompanied this code. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA - * or visit www.oracle.com if you need additional information or have any - * questions. - */ -package java.util.stream; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.function.BiConsumer; -import java.util.function.BinaryOperator; -import java.util.function.Function; -import java.util.function.Functions; -import java.util.function.Predicate; -import java.util.function.Supplier; - -/** - * ConcurrentReducers - * - * @since 1.8 - */ -public final class ConcurrentCollectors { - - private ConcurrentCollectors() {} - - public static <T, K> - Collector<T, ConcurrentMap<K, Collection<T>>> groupingBy(Function<? super T, ? extends K> classifier) { - return groupingBy(classifier, ConcurrentHashMap::new, ArrayList::new); - } - - public static <T, K, C extends Collection<T>, M extends ConcurrentMap<K, C>> - Collector<T, M> groupingBy(Function<? super T, ? extends K> classifier, - Supplier<M> mapFactory, - Supplier<C> rowFactory) { - return groupingBy(classifier, mapFactory, Collectors.toCollection(rowFactory)); - } - - public static <T, K, D> Collector<T, ConcurrentMap<K, D>> groupingBy(Function<? super T, ? extends K> classifier, - Collector<T, D> downstream) { - return groupingBy(classifier, (Supplier<ConcurrentMap<K, D>>) ConcurrentHashMap::new, downstream); - } - - static <T, K, D, M extends ConcurrentMap<K, D>> Collector<T, M> groupingBy(Function<? super T, ? extends K> classifier, - Supplier<M> mapFactory, - Collector<T, D> downstream) { - Supplier<D> downstreamSupplier = downstream.resultSupplier(); - BiConsumer<D, T> downstreamAccumulator = downstream.accumulator(); - BiConsumer<D, T> wrappedAccumulator - = downstream.isConcurrent() - ? downstreamAccumulator - : (d, t) -> { synchronized(d) { downstreamAccumulator.accept(d, t); } }; - BiConsumer<M, T> accumulator = (m, t) -> { - K key = Objects.requireNonNull(classifier.apply(t), "element cannot be mapped to a null key"); - wrappedAccumulator.accept(m.computeIfAbsent(key, k -> downstreamSupplier.get()), t); - }; - return new Collectors.CollectorImpl<>(mapFactory, accumulator, - Collectors.leftMapMerger(downstream.combiner()), true); - } - - public static <T, K> Collector<T, ConcurrentMap<K,T>> groupingReduce(Function<? super T, ? extends K> classifier, - BinaryOperator<T> reducer) { - return groupingReduce(classifier, ConcurrentHashMap::new, Functions.identity(), reducer); - } - - public static <T, K, M extends ConcurrentMap<K, T>> - Collector<T, M> groupingReduce(Function<? super T, ? extends K> classifier, - Supplier<M> mapFactory, - BinaryOperator<T> reducer) { - return groupingReduce(classifier, mapFactory, Functions.identity(), reducer); - } - - public static <T, K, D> Collector<T, ConcurrentMap<K,D>> groupingReduce(Function<? super T, ? extends K> classifier, - Function<? super T, ? extends D> mapper, - BinaryOperator<D> reducer) { - return groupingReduce(classifier, (Supplier<ConcurrentMap<K, D>>) ConcurrentHashMap::new, mapper, reducer); - } - - public static <T, K, D, M extends ConcurrentMap<K, D>> - Collector<T, M> groupingReduce(Function<? super T, ? extends K> classifier, - Supplier<M> mapFactory, - Function<? super T, ? extends D> mapper, - BinaryOperator<D> reducer) { - BiConsumer<M, T> accumulator = (map, value) -> map.merge(classifier.apply(value), mapper.apply(value), reducer); - return new Collectors.CollectorImpl<>(mapFactory, accumulator, Collectors.leftMapMerger(reducer), true); - } - - - public static <T, U> Collector<T, ConcurrentMap<T,U>> joiningWith(Function<? super T, ? extends U> mapper) { - return joiningWith(mapper, Collectors.throwingMerger()); - } - - public static <T, U> Collector<T, ConcurrentMap<T,U>> joiningWith(Function<? super T, ? extends U> mapper, - BinaryOperator<U> mergeFunction) { - return joiningWith(mapper, mergeFunction, ConcurrentHashMap::new); - } - - public static <T, U, M extends ConcurrentMap<T, U>> Collector<T, M> joiningWith(Function<? super T, ? extends U> mapper, - Supplier<M> mapSupplier) { - return joiningWith(mapper, Collectors.throwingMerger(), mapSupplier); - } - - public static <T, U, M extends ConcurrentMap<T, U>> Collector<T, M> joiningWith(Function<? super T, ? extends U> mapper, - BinaryOperator<U> mergeFunction, - Supplier<M> mapSupplier) { - BiConsumer<M, T> accumulator = (map, value) -> map.merge(value, mapper.apply(value), mergeFunction); - return new Collectors.CollectorImpl<>(mapSupplier, accumulator, Collectors.leftMapMerger(mergeFunction), true); - } - - - public static<T> Collector<T, Map<Boolean, Collection<T>>> partitioningBy(Predicate<T> predicate) { - return partitioningBy(predicate, ArrayList::new); - } - - public static<T, C extends Collection<T>> Collector<T, Map<Boolean, C>> - partitioningBy(Predicate<T> predicate, - Supplier<C> rowFactory) { - return partitioningBy(predicate, Collectors.toCollection(rowFactory)); - } - - public static<T, D> Collector<T, Map<Boolean, D>> - partitioningBy(Predicate<T> predicate, - Collector<T, D> downstream) { - BiConsumer<D, T> downstreamAccumulator = downstream.accumulator(); - BiConsumer<D, T> wrappedAccumulator - = downstream.isConcurrent() - ? downstreamAccumulator - : (d, t) -> { synchronized(d) { downstreamAccumulator.accept(d, t); } }; - BiConsumer<Map<Boolean, D>, T> accumulator = (result, t) -> { - Collectors.Partition<D> asPartition = ((Collectors.Partition<D>) result); - wrappedAccumulator.accept(predicate.test(t) ? asPartition.forTrue : asPartition.forFalse, t); - }; - return new Collectors.CollectorImpl<>(() -> new Collectors.Partition<>(downstream.resultSupplier().get(), - downstream.resultSupplier().get()), - accumulator, Collectors.leftPartitionMerger(downstream.combiner()), true); - } - - public static<T> Collector<T, Map<Boolean, T>> partitioningReduce(Predicate<T> predicate, - T identity, - BinaryOperator<T> reducer) { - return partitioningReduce(predicate, identity, Functions.identity(), reducer); - } - - public static<T, U> Collector<T, Map<Boolean, U>> partitioningReduce(Predicate<T> predicate, - U identity, - Function<T, U> mapper, - BinaryOperator<U> reducer) { - final Object trueLock = new Object(); - final Object falseLock = new Object(); - BiConsumer<Map<Boolean, U>, T> accumulator = (result, t) -> { - Collectors.Partition<U> asPartition = ((Collectors.Partition<U>) result); - if (predicate.test(t)) { - synchronized (trueLock) { - asPartition.forTrue = reducer.apply(asPartition.forTrue, mapper.apply(t)); - } - } - else { - synchronized (falseLock) { - asPartition.forFalse = reducer.apply(asPartition.forFalse, mapper.apply(t)); - } - } - }; - return new Collectors.CollectorImpl<>(() -> new Collectors.Partition<>(identity, identity), - accumulator, Collectors.leftPartitionMerger(reducer), true); - } -}
--- a/test-ng/tests/org/openjdk/tests/java/util/stream/ReduceByOpTest.java Fri Mar 08 21:54:17 2013 -0800 +++ b/test-ng/tests/org/openjdk/tests/java/util/stream/ReduceByOpTest.java Sun Mar 10 17:25:18 2013 -0400 @@ -50,10 +50,8 @@ @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class) public void testOps(String name, StreamTestData<Integer> data) { Map<Boolean,Collection<Integer>> gbResult = data.stream().collect(Collectors.groupingBy(Functions.forPredicate(pEven, true, false))); - Collector<Integer, Map<Boolean, Integer>> collector = Collectors.groupingReduce(Functions.forPredicate(pEven, true, false), - HashMap::new, - Functions.identity(), rPlus); - Map<Boolean,Integer> result = data.stream().collect(collector); + Collector<Integer, Map<Boolean, Integer>> collector = Collectors.groupingBy(Functions.forPredicate(pEven, true, false), HashMap::new).thenReducing(rPlus); + Map<Boolean, Integer> result = data.stream().collect(collector); assertEquals(result.size(), gbResult.size()); for (Map.Entry<Boolean, Integer> entry : result.entrySet()) { Boolean key = entry.getKey(); @@ -63,7 +61,7 @@ int uniqueSize = data.into(new HashSet<Integer>()).size(); Map<Integer, Collection<Integer>> mgResult = exerciseTerminalOps(data, s -> s.collect(Collectors.groupingBy(mId))); - Collector<Integer, Map<Integer, Integer>> collector2 = Collectors.groupingReduce(mId, HashMap::new, e -> 1, Integer::sum); + Collector<Integer, Map<Integer, Integer>> collector2 = Collectors.groupingBy(mId, HashMap::new).thenReducing(e -> 1, Integer::sum); Map<Integer, Integer> miResult = exerciseTerminalOps(data, s -> s.collect(collector2)); assertEquals(miResult.keySet().size(), uniqueSize); for (Map.Entry<Integer, Integer> entry : miResult.entrySet())
--- a/test-ng/tests/org/openjdk/tests/java/util/stream/TabulatorsTest.java Fri Mar 08 21:54:17 2013 -0800 +++ b/test-ng/tests/org/openjdk/tests/java/util/stream/TabulatorsTest.java Sun Mar 10 17:25:18 2013 -0400 @@ -30,9 +30,12 @@ import java.util.HashSet; import java.util.Map; import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.function.BinaryOperator; import java.util.function.Function; import java.util.function.Functions; +import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collector; import java.util.stream.Collectors; @@ -45,7 +48,8 @@ import org.testng.annotations.Test; import static java.util.stream.Collectors.groupingBy; -import static java.util.stream.Collectors.groupingReduce; +import static java.util.stream.Collectors.groupingByConcurrent; +import static java.util.stream.Collectors.toCollection; import static java.util.stream.LambdaTestHelpers.assertContents; import static java.util.stream.LambdaTestHelpers.assertContentsUnordered; import static java.util.stream.LambdaTestHelpers.mDoubler; @@ -56,15 +60,12 @@ * @author Brian Goetz */ public class TabulatorsTest extends OpTestCase { - // @@@ Test plan - // There are 8 versions of groupBy: - // groupingBy: { supplier, not } x { downstream collect, collect-to-list } - // groupingReduce: { supplier, not } x { reduce, map-reduce } - // There are 5 versions of partition: - // partition(predicate) - // partition(predicate, collectionSupplier) - // partition(predicate, { mutableReduce, reduce, mapReduce }) - // There are 4 versions of mappedTo: + // There are 4 versions of groupingBy: + // groupingBy: { supplier, not } x { concurrent, not } + // each version has raw, cascaded, and reducing forms + // There are 2 versions of partition (concurrent and not) + // each version has raw, cascaded, and reducing forms + // There are 4 versions of mappedTo // mappedTo(function, mapSupplier?, mergeFunction?) // Each variety needs at least one test // Plus a variety of multi-level tests (groupBy(..., partition), partition(..., groupBy)) @@ -73,7 +74,9 @@ private static abstract class TabulationAssertion<T, U> { - abstract void assertValue(U value, Supplier<Stream<T>> source) throws ReflectiveOperationException; + abstract void assertValue(U value, + Supplier<Stream<T>> source, + boolean ordered) throws ReflectiveOperationException; } @SuppressWarnings({"rawtypes", "unchecked"}) @@ -90,16 +93,45 @@ this.downstream = downstream; } - void assertValue(Map<K, V> map, Supplier<Stream<T>> source) throws ReflectiveOperationException { - assert clazz.isAssignableFrom(map.getClass()); + void assertValue(Map<K, V> map, + Supplier<Stream<T>> source, + boolean ordered) throws ReflectiveOperationException { + if (!clazz.isAssignableFrom(map.getClass())) + fail(String.format("Class mismatch in GroupedMapAssertion: %s, %s", clazz, map.getClass())); assertContentsUnordered(map.keySet(), source.get().map(classifier).collect(Collectors.toSet())); for (Map.Entry<K,V> entry : map.entrySet()) { K key = entry.getKey(); - downstream.assertValue(entry.getValue(), () -> source.get().filter(e -> classifier.apply(e).equals(key))); + downstream.assertValue(entry.getValue(), + () -> source.get().filter(e -> classifier.apply(e).equals(key)), + ordered); } } } + static class PartitionAssertion<T> extends TabulationAssertion<T, Map<Boolean,T>> { + private final Class<? extends Map> clazz; + private final Predicate<T> predicate; + private final TabulationAssertion<T,T> downstream; + + protected PartitionAssertion(Predicate<T> predicate, + Class<? extends Map> clazz, + TabulationAssertion<T, T> downstream) { + this.clazz = clazz; + this.predicate = predicate; + this.downstream = downstream; + } + + void assertValue(Map<Boolean, T> map, + Supplier<Stream<T>> source, + boolean ordered) throws ReflectiveOperationException { + if (!clazz.isAssignableFrom(map.getClass())) + fail(String.format("Class mismatch in GroupedMapAssertion: %s, %s", clazz, map.getClass())); + assertEquals(2, map.size()); + downstream.assertValue(map.get(true), () -> source.get().filter(predicate), ordered); + downstream.assertValue(map.get(false), () -> source.get().filter(predicate.negate()), ordered); + } + } + @SuppressWarnings({"rawtypes", "unchecked"}) static class CollectionAssertion<T> extends TabulationAssertion<T, Collection<T>> { private final Class<? extends Collection> clazz; @@ -111,12 +143,13 @@ } @Override - void assertValue(Collection<T> value, Supplier<Stream<T>> source) throws ReflectiveOperationException { - assert clazz.isAssignableFrom(value.getClass()); + void assertValue(Collection<T> value, Supplier<Stream<T>> source, boolean ordered) throws ReflectiveOperationException { + if (!clazz.isAssignableFrom(value.getClass())) + fail(String.format("Class mismatch in CollectionAssertion: %s, %s", clazz, value.getClass())); Stream<T> stream = source.get(); Collection<T> result = clazz.newInstance(); stream.forEach(result::add); - if (StreamOpFlagTestHelper.isStreamOrdered(stream) && targetOrdered) + if (StreamOpFlagTestHelper.isStreamOrdered(stream) && targetOrdered && ordered) assertContents(value, result); else assertContentsUnordered(value, result); @@ -133,33 +166,58 @@ } @Override - void assertValue(U value, Supplier<Stream<T>> source) throws ReflectiveOperationException { + void assertValue(U value, Supplier<Stream<T>> source, boolean ordered) throws ReflectiveOperationException { assertEquals(value, source.get().map(mapper).reduce(reducer).get()); } } - private<T, R> void exerciseTabulation(StreamTestData<Integer> data, - Collector<Integer, R> collector, - TabulationAssertion<Integer, R> assertion) throws ReflectiveOperationException { + private<T, R extends Map> void exerciseMapTabulation(StreamTestData<Integer> data, + Collector<Integer, R> collector, + TabulationAssertion<Integer, R> assertion) throws ReflectiveOperationException { R r = exerciseTerminalOps(data, s -> s.collect(collector)); - assertion.assertValue(r, () -> data.stream()); + assertion.assertValue(r, () -> data.stream(), true); + r = withData(data) + .terminal(s -> s.collectUnordered(collector)) + .parallelEqualityAsserter(this::nestedMapEqualityAssertion) + .exercise(); + assertion.assertValue(r, () -> data.stream(), false); + } + + private void nestedMapEqualityAssertion(Object o1, Object o2) { + if (o1 instanceof Map) { + Map m1 = (Map) o1; + Map m2 = (Map) o2; + assertContentsUnordered(m1.keySet(), m2.keySet()); + for (Object k : m1.keySet()) + nestedMapEqualityAssertion(m1.get(k), m2.get(k)); + } + else if (o1 instanceof Collection) { + assertContentsUnordered(((Collection) o1), ((Collection) o2)); + } + else + assertEquals(o1, o2); } @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class) - public void testGroupBy(String name, StreamTestData<Integer> data) throws ReflectiveOperationException { + public void testSimpleGroupBy(String name, StreamTestData<Integer> data) throws ReflectiveOperationException { Function<Integer, Integer> classifier = i -> i % 3; // Single-level groupBy - exerciseTabulation(data, - groupingBy(classifier), - new GroupedMapAssertion<>(classifier, HashMap.class, - new CollectionAssertion<Integer>(ArrayList.class, true))); + exerciseMapTabulation(data, groupingBy(classifier), new GroupedMapAssertion<>(classifier, HashMap.class, + new CollectionAssertion<Integer>(ArrayList.class, true))); + exerciseMapTabulation(data, groupingByConcurrent(classifier), new GroupedMapAssertion<>(classifier, ConcurrentHashMap.class, + new CollectionAssertion<Integer>(ArrayList.class, true))); // With explicit constructors - exerciseTabulation(data, - groupingBy(classifier, TreeMap::new, HashSet::new), - new GroupedMapAssertion<>(classifier, TreeMap.class, - new CollectionAssertion<Integer>(HashSet.class, false))); + exerciseMapTabulation(data, + groupingBy(classifier, TreeMap::new).then(toCollection((Supplier<Collection>) HashSet::new)), + new GroupedMapAssertion<>(classifier, TreeMap.class, + new CollectionAssertion<Integer>(HashSet.class, false))); + exerciseMapTabulation(data, + groupingByConcurrent(classifier, ConcurrentSkipListMap::new) + .then(toCollection((Supplier<Collection>) HashSet::new)), + new GroupedMapAssertion<>(classifier, ConcurrentSkipListMap.class, + new CollectionAssertion<Integer>(HashSet.class, false))); } @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class) @@ -168,18 +226,54 @@ Function<Integer, Integer> classifier2 = i -> i % 23; // Two-level groupBy - exerciseTabulation(data, - groupingBy(classifier, groupingBy(classifier2)), - new GroupedMapAssertion<>(classifier, HashMap.class, - new GroupedMapAssertion<>(classifier2, HashMap.class, - new CollectionAssertion<Integer>(ArrayList.class, true)))); + exerciseMapTabulation(data, + groupingBy(classifier).then(groupingBy(classifier2)), + new GroupedMapAssertion<>(classifier, HashMap.class, + new GroupedMapAssertion<>(classifier2, HashMap.class, + new CollectionAssertion<Integer>(ArrayList.class, true)))); + // with concurrent as upstream + exerciseMapTabulation(data, + groupingByConcurrent(classifier).then(groupingBy(classifier2)), + new GroupedMapAssertion<>(classifier, ConcurrentHashMap.class, + new GroupedMapAssertion<>(classifier2, HashMap.class, + new CollectionAssertion<Integer>(ArrayList.class, true)))); + // with concurrent as downstream + exerciseMapTabulation(data, + groupingBy(classifier).then(groupingByConcurrent(classifier2)), + new GroupedMapAssertion<>(classifier, HashMap.class, + new GroupedMapAssertion<>(classifier2, ConcurrentHashMap.class, + new CollectionAssertion<Integer>(ArrayList.class, true)))); + // with concurrent as upstream and downstream + exerciseMapTabulation(data, + groupingByConcurrent(classifier).then(groupingByConcurrent(classifier2)), + new GroupedMapAssertion<>(classifier, ConcurrentHashMap.class, + new GroupedMapAssertion<>(classifier2, ConcurrentHashMap.class, + new CollectionAssertion<Integer>(ArrayList.class, true)))); // With explicit constructors - exerciseTabulation(data, - groupingBy(classifier, TreeMap::new, groupingBy(classifier2, TreeMap::new, HashSet::new)), - new GroupedMapAssertion<>(classifier, TreeMap.class, - new GroupedMapAssertion<>(classifier2, TreeMap.class, - new CollectionAssertion<Integer>(HashSet.class, false)))); + exerciseMapTabulation(data, + groupingBy(classifier, TreeMap::new).then(groupingBy(classifier2, TreeMap::new).then(toCollection((Supplier<Collection>) HashSet::new))), + new GroupedMapAssertion<>(classifier, TreeMap.class, + new GroupedMapAssertion<>(classifier2, TreeMap.class, + new CollectionAssertion<Integer>(HashSet.class, false)))); + // with concurrent as upstream + exerciseMapTabulation(data, + groupingByConcurrent(classifier, ConcurrentSkipListMap::new).then(groupingBy(classifier2, TreeMap::new)), + new GroupedMapAssertion<>(classifier, ConcurrentSkipListMap.class, + new GroupedMapAssertion<>(classifier2, TreeMap.class, + new CollectionAssertion<Integer>(ArrayList.class, true)))); + // with concurrent as downstream + exerciseMapTabulation(data, + groupingBy(classifier, TreeMap::new).then(groupingByConcurrent(classifier2, ConcurrentSkipListMap::new)), + new GroupedMapAssertion<>(classifier, TreeMap.class, + new GroupedMapAssertion<>(classifier2, ConcurrentSkipListMap.class, + new CollectionAssertion<Integer>(ArrayList.class, true)))); + // with concurrent as upstream and downstream + exerciseMapTabulation(data, + groupingByConcurrent(classifier, ConcurrentSkipListMap::new).then(groupingByConcurrent(classifier2, ConcurrentSkipListMap::new)), + new GroupedMapAssertion<>(classifier, ConcurrentSkipListMap.class, + new GroupedMapAssertion<>(classifier2, ConcurrentSkipListMap.class, + new CollectionAssertion<Integer>(ArrayList.class, true)))); } @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class) @@ -187,27 +281,47 @@ Function<Integer, Integer> classifier = i -> i % 3; // Single-level simple reduce - exerciseTabulation(data, - groupingReduce(classifier, Integer::sum), - new GroupedMapAssertion<>(classifier, HashMap.class, - new ReduceAssertion<>(Functions.identity(), Integer::sum))); + exerciseMapTabulation(data, + groupingBy(classifier).thenReducing(Integer::sum), + new GroupedMapAssertion<>(classifier, HashMap.class, + new ReduceAssertion<>(Functions.identity(), Integer::sum))); + // with concurrent + exerciseMapTabulation(data, + groupingByConcurrent(classifier).thenReducing(Integer::sum), + new GroupedMapAssertion<>(classifier, ConcurrentHashMap.class, + new ReduceAssertion<>(Functions.identity(), Integer::sum))); // With explicit constructors - exerciseTabulation(data, - groupingReduce(classifier, () -> new TreeMap<>(), Integer::sum), - new GroupedMapAssertion<>(classifier, TreeMap.class, - new ReduceAssertion<>(Functions.identity(), Integer::sum))); + exerciseMapTabulation(data, + groupingBy(classifier, TreeMap::new).thenReducing(Integer::sum), + new GroupedMapAssertion<>(classifier, TreeMap.class, + new ReduceAssertion<>(Functions.identity(), Integer::sum))); + // with concurrent + exerciseMapTabulation(data, + groupingByConcurrent(classifier, ConcurrentSkipListMap::new).thenReducing(Integer::sum), + new GroupedMapAssertion<>(classifier, ConcurrentSkipListMap.class, + new ReduceAssertion<>(Functions.identity(), Integer::sum))); // Single-level map-reduce - exerciseTabulation(data, - groupingReduce(classifier, mDoubler, Integer::sum), - new GroupedMapAssertion<>(classifier, HashMap.class, - new ReduceAssertion<>(mDoubler, Integer::sum))); + exerciseMapTabulation(data, + groupingBy(classifier).thenReducing(mDoubler, Integer::sum), + new GroupedMapAssertion<>(classifier, HashMap.class, + new ReduceAssertion<>(mDoubler, Integer::sum))); + // with concurrent + exerciseMapTabulation(data, + groupingByConcurrent(classifier).thenReducing(mDoubler, Integer::sum), + new GroupedMapAssertion<>(classifier, ConcurrentHashMap.class, + new ReduceAssertion<>(mDoubler, Integer::sum))); // With explicit constructors - exerciseTabulation(data, - groupingReduce(classifier, TreeMap::new, mDoubler, Integer::sum), - new GroupedMapAssertion<>(classifier, TreeMap.class, - new ReduceAssertion<>(mDoubler, Integer::sum))); + exerciseMapTabulation(data, + groupingBy(classifier, TreeMap::new).thenReducing(mDoubler, Integer::sum), + new GroupedMapAssertion<>(classifier, TreeMap.class, + new ReduceAssertion<>(mDoubler, Integer::sum))); + // with concurrent + exerciseMapTabulation(data, + groupingByConcurrent(classifier, ConcurrentSkipListMap::new).thenReducing(mDoubler, Integer::sum), + new GroupedMapAssertion<>(classifier, ConcurrentSkipListMap.class, + new ReduceAssertion<>(mDoubler, Integer::sum))); } }