changeset 7845:60998dc19cdf

More Collector spec (and a few new Collectors)
author briangoetz
date Mon, 08 Apr 2013 16:49:39 -0400
parents ff8c98bc4bc2
children cc55bc49dda8
files src/share/classes/java/util/stream/Collector.java src/share/classes/java/util/stream/Collectors.java src/share/classes/java/util/stream/package-info.java
diffstat 3 files changed, 238 insertions(+), 90 deletions(-) [+]
line wrap: on
line diff
--- a/src/share/classes/java/util/stream/Collector.java	Mon Apr 08 13:23:04 2013 -0400
+++ b/src/share/classes/java/util/stream/Collector.java	Mon Apr 08 16:49:39 2013 -0400
@@ -31,58 +31,106 @@
 import java.util.function.Supplier;
 
 /**
- * A <a href="package-summary.html#Reduction">reduction operation</a> that supports folding
- * input elements into a cumulative result.  The result may be a value or may be a mutable
- * result container.  Examples of operations accumulating results into a mutable result
- * container include: accumulating input elements into a {@code Collection}; concatenating
- * strings into a {@code StringBuilder}; computing summary information about elements such as
- * sum, min, max, or average; computing "pivot table" summaries such as "maximum valued
- * transaction by seller", etc.  Reduction operations can be performed either sequentially or
- * in parallel.
+ * A <a href="package-summary.html#Reduction">reduction operation</a> that
+ * supports folding input elements into a cumulative result.  The result may be
+ * a value or may be a mutable result container.  Examples of operations
+ * accumulating results into a mutable result container include: accumulating
+ * input elements into a {@code Collection}; concatenating strings into a
+ * {@code StringBuilder}; computing summary information about elements such as
+ * sum, min, max, or average; computing "pivot table" summaries such as "maximum
+ * valued transaction by seller", etc.  Reduction operations can be performed
+ * either sequentially or in parallel.
  *
- * <p>A {@code Collector} is specified by three functions that work together to manage
- * a result or result container.  They are: creation of an initial result, incorporating
- * a new data element into a result, and combining two results into one. The last function
- * -- combining two results into one -- is used during parallel operations, where subsets
- * of the input are collected in parallel, and then the subresults merged into a combined
- * result. The result may be a mutable container or a value.  If the result is mutable,
- * the accumulation and combination functions may either mutate their left argument and return
- * that (such as adding elements to a collection), or return a new result, in which case it
- * should not perform any mutation.
+ * <p>The following are examples of using the predefined {@code Collector}
+ * implementations in {@link Collectors} with the {@code Stream} API to perform
+ * mutable reduction tasks:
+ * <pre>{@code
+ *     // Accumulate elements into a List
+ *     List list = stream.collect(Collectors.toList());
  *
- * <p>Collectors also have a set of characteristics, including {@link Characteristics#CONCURRENT}
- * and {@link Characteristics#STRICTLY_MUTATIVE}.  These characteristics provide hints that
- * can be used by a reduction implementation to provide better performance.
+ *     // Accumulate elements into a TreeSet
+ *     List list = stream.collect(Collectors.toCollection(TreeSet::new));
+ *
+ *     // Convert elements to strings and concatenate them, separated by commas
+ *     String joined = stream.map(Object::toString)
+ *                           .collect(Collectors.toStringJoiner(", "))
+ *                           .toString();
+ *
+ *     // Find highest-paid employee
+ *     Employee highestPaid = employees.stream()
+ *                                     .collect(Collectors.maxBy(Comparators.comparing(Employee::getSalary)));
+ *
+ *     // Group employees by department
+ *     Map<Department, List<Employee>> byDept
+ *         = employees.stream()
+ *                    .collect(Collectors.groupingBy(Employee::getDepartment));
+ *
+ *     // Find highest-paid employee by department
+ *     Map<Department, Employee> highestPaidByDept
+ *         = employees.stream()
+ *                    .collect(Collectors.groupingBy(Employee::getDepartment,
+ *                                                   Collectors.maxBy(Comparators.comparing(Employee::getSalary))));
+ *
+ *     // Partition students into passing and failing
+ *     Map<Boolean, List<Student>> passingFailing =
+ *         students.stream()
+ *                 .collect(Collectors.partitioningBy(s -> s.getGrade() >= PASS_THRESHOLD);
+ *
+ * }</pre>
+ *
+ * <p>A {@code Collector} is specified by three functions that work together to
+ * manage a result or result container.  They are: creation of an initial
+ * result, incorporating a new data element into a result, and combining two
+ * results into one. The last function -- combining two results into one -- is
+ * used during parallel operations, where subsets of the input are accumulated
+ * in parallel, and then the subresults merged into a combined result. The
+ * result may be a mutable container or a value.  If the result is mutable, the
+ * accumulation and combination functions may either mutate their left argument
+ * and return that (such as adding elements to a collection), or return a new
+ * result, in which case it should not perform any mutation.
+ *
+ * <p>Collectors also have a set of characteristics, including
+ * {@link Characteristics#CONCURRENT} and
+ * {@link Characteristics#STRICTLY_MUTATIVE}.  These characteristics provide
+ * hints that can be used by a reduction implementation to provide better
+ * performance.
  *
  * <p>Libraries that implement reduction based on {@code Collector}, such as
  * {@link Stream#collect(Collector)}, must adhere to the following constraints:
  * <ul>
- *     <li>The first argument passed to the accumulator function, and both arguments passed
- *     to the combiner function, must be the result of of a previous invocation of
- *     {@link #resultSupplier()}, {@link #accumulator()}, or {@link #combiner()}.</li>
- *     <li>The implementation should not do anything with the result of any of the result
- *     supplier, accumulator, or combiner functions other than to pass them again to the
- *     accumulator or combiner functions, or return them to the caller of the reduction
- *     operation.</li>
- *     <li>If a result is passed to the accumulator or combiner function, and the same object
- *     is not returned from that function, it is never used again.</li>
- *     <li>Once a result is passed to the combiner function, it is never passed to the
- *     accumulator function again.</li>
- *     <li>For non-concurrent collectors, any result returned from the result supplier,
- *     accumulator, or combiner functions must be serially thread-confined.  This enables
- *     collection to occur in parallel without the {@code Collector} needing to implement
- *     any additional synchronization.  The reduction implementation must manage that
- *     the input is properly partitioned, that partitions are processed in isolation,
- *     and combining happens only after accumulation is complete.</li>
- *     <li>For concurrent collectors, an implementation is free to (but not required to)
- *     implement reduction concurrently.  A concurrent collection is one where the
- *     accumulator function is called concurrently from multiple threads, rather than
- *     keeping the result isolated during accumulation.</li>
+ *     <li>The first argument passed to the accumulator function, and both
+ *     arguments passed to the combiner function, must be the result of a
+ *     previous invocation of {@link #resultSupplier()}, {@link #accumulator()},
+ *     or {@link #combiner()}.</li>
+ *     <li>The implementation should not do anything with the result of any of
+ *     the result supplier, accumulator, or combiner functions other than to
+ *     pass them again to the accumulator or combiner functions, or return them
+ *     to the caller of the reduction operation.</li>
+ *     <li>If a result is passed to the accumulator or combiner function, and
+ *     the same object is not returned from that function, it is never used
+ *     again.</li>
+ *     <li>Once a result is passed to the combiner function, it is never passed
+ *     to the accumulator function again.</li>
+ *     <li>For non-concurrent collectors, any result returned from the result
+ *     supplier, accumulator, or combiner functions must be serially
+ *     thread-confined.  This enables collection to occur in parallel without
+ *     the {@code Collector} needing to implement any additional synchronization.
+ *     The reduction implementation must manage that the input is properly
+ *     partitioned, that partitions are processed in isolation, and combining
+ *     happens only after accumulation is complete.</li>
+ *     <li>For concurrent collectors, an implementation is free to (but not
+ *     required to) implement reduction concurrently.  A concurrent reduction
+ *     is one where the accumulator function is called concurrently from
+ *     multiple threads, using the same concurrently-modifiable result container,
+ *     rather than keeping the result isolated during accumulation.
+ *     A concurrent reduction should only be applied if the collector has the
+ *     {@link Characteristics#UNORDERED} characteristics or if the
+ *     originating data is unordered.</li>
  * </ul>
  *
  * @apiNote
- * <p>Performing a reduction operation with a {@code Collector} should produce a result
- * equivalent to:
+ * <p>Performing a reduction operation with a {@code Collector} should produce a
+ * result equivalent to:
  * <pre>{@code
  *     BiFunction<R,T,R> accumulator = collector.accumulator();
  *     R result = collector.resultSupplier().get();
@@ -91,19 +139,21 @@
  *     return result;
  * }</pre>
  *
- * However, the library is free to partition the input, perform the reduction on the partitions,
- * and then use the combiner function to combine the partial results to achieve a parallel
- * reduction.  Depending on the specific reduction operation, this may perform better or worse,
- * depending on the relative cost of the accumulator and combiner functions.
+ * However, the library is free to partition the input, perform the reduction on
+ * the partitions, and then use the combiner function to combine the partial
+ * results to achieve a parallel reduction.  Depending on the specific reduction
+ * operation, this may perform better or worse, depending on the relative cost
+ * of the accumulator and combiner functions.
  *
- * <p>An example of an operation that can be easily modeled by {@code Collector} is accumulating
- * elements into a {@code TreeSet}. In this case, the {@code resultSupplier()} function is
- * {@code () -> new Treeset<T>()}, the {@code accumulator} function is
- * {@code (set, element) -> { set.add(element); return set; }}, and the combiner function is
- * {@code (left, right) -> { left.addAll(right); return left; }}.  (This behavior is
- * implemented by the method {@code Collectors.toCollection(TreeSet::new)}).
+ * <p>An example of an operation that can be easily modeled by {@code Collector}
+ * is accumulating elements into a {@code TreeSet}. In this case, the {@code
+ * resultSupplier()} function is {@code () -> new Treeset<T>()}, the
+ * {@code accumulator} function is
+ * {@code (set, element) -> { set.add(element); return set; }}, and the combiner
+ * function is {@code (left, right) -> { left.addAll(right); return left; }}.
+ * (This behavior is implemented by
+ * {@code Collectors.toCollection(TreeSet::new)}).
  *
- * TODO Document concurrent behavior and interaction with ordering
  * TODO Associativity and commutativity
  *
  * @see Stream#collect(Collector)
@@ -131,7 +181,7 @@
      *
      * <p>If the collector has the {@link Characteristics#STRICTLY_MUTATIVE} characteristic,
      * then the accumulator function <em>must</em> always return its first argument, after
-     * possibly  mutating its state.
+     * possibly mutating its state.
      *
      * @return A function which folds a new value into a cumulative result
      */
@@ -170,12 +220,10 @@
          * Indicates that this collector is <em>concurrent</em>, meaning that the result
          * container can support the accumulator function being called concurrently with
          * the same result container from multiple threads. Concurrent collectors must also
-         * always have the STRICTLY_MUTATIVE characteristic.
+         * always have the {@code STRICTLY_MUTATIVE} characteristic.
          *
-         * <p>Because a concurrent collection cannot guarantee that the elements will be
-         * presented to the accumulator function in encounter order, a concurrent collector
-         * must represent a combining operation that is not only
-         * <a href="package-summary.html#Associativity">associative</a>, but also commutative.
+         * <p>If this collector is not also {@code UNORDERED}, then it should
+         * only be evaluated concurrently if applied to an unordered data source.
          */
         CONCURRENT,
         /**
--- a/src/share/classes/java/util/stream/Collectors.java	Mon Apr 08 13:23:04 2013 -0400
+++ b/src/share/classes/java/util/stream/Collectors.java	Mon Apr 08 16:49:39 2013 -0400
@@ -29,6 +29,8 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
+import java.util.Comparators;
 import java.util.DoubleSummaryStatistics;
 import java.util.EnumSet;
 import java.util.HashMap;
@@ -322,11 +324,65 @@
      * @return A {@code Collector} that counts its input elements
      */
     public static<T> Collector<T, Long>
-    counting() {
+    count() {
         return reducing(0L, e -> 1L, Long::sum);
     }
 
     /**
+     * Produces a {@code Collector<T, T>} that produces the minimal element
+     * according to a given {@code Comparator}.
+     *
+     * @implSpec
+     * This produces a result equivalent to:
+     * <pre>{@code
+     *     reducing(Comparators.lesserOf(comparator))
+     * }</pre>
+     * @param <T> The type of the input elements
+     * @param comparator A {@code Comparator} for comparing elements
+     * @return A {@code Collector} that produces the maximal value
+     */
+    public static<T> Collector<T, T>
+    minBy(Comparator<? super T> comparator) {
+        return reducing(Comparators.lesserOf(comparator));
+    }
+
+    /**
+     * Produces a {@code Collector<T, T>} that produces the maximal element
+     * according to a given {@code Comparator}.
+     *
+     * @implSpec
+     * This produces a result equivalent to:
+     * <pre>{@code
+     *     reducing(Comparators.greaterOf(comparator))
+     * }</pre>
+     * @param <T> The type of the input elements
+     * @param comparator A {@code Comparator} for comparing elements
+     * @return A {@code Collector} that produces the maximal value
+     */
+    public static<T> Collector<T, T>
+    maxBy(Comparator<? super T> comparator) {
+        return reducing(Comparators.greaterOf(comparator));
+    }
+
+    /**
+     * Produces a {@code Collector<T, Long>} that produces the sum of an
+     * long-valued function applied to the input element.
+     *
+     * @implSpec
+     * This produces a result equivalent to:
+     * <pre>{@code
+     *     reducing(0L, mapper, Long::sum)
+     * }</pre>
+     * @param <T> The type of the input elements
+     * @param mapper A function extracting the property to be summed
+     * @return A {@code Collector} that produces the sum of a derived property
+     */
+    public static<T> Collector<T, Long>
+    sumBy(Function<? super T, Long> mapper) {
+        return reducing(0L, mapper, Long::sum);
+    }
+
+    /**
      * Given a {@code BinaryOperator<T>}, return a {@code Collector<T,T>} which calculates the
      * reduction of its input elements under the specified {@code BinaryOperator}.
      *
@@ -353,6 +409,30 @@
     }
 
     /**
+     * Given a {@code BinaryOperator<T>}, return a {@code Collector<T,T>} which calculates the
+     * reduction of its input elements under the specified {@code BinaryOperator}.
+     *
+     * @apiNote
+     * The {@code reducing()} collectors are most useful when used in a multi-level collection
+     * following a {@code groupingBy} or {@code partitioningBy} collection; if you want to perform
+     * a simple reduction on a stream, use {@link Stream#reduce(BinaryOperator)}.
+     * For example, given a stream of {@code Person}, to calculate tallest person in each city:
+     * <pre>{@code
+     *     Comparator<Person> byHeight = Comparators.comparing(Person::getHeight);
+     *     BinaryOperator<Person> tallerOf = Comparators.greaterOf(byHeight);
+     *     Map<City, Person> tallestByCity
+     *         = people.stream().collect(groupingBy(Person::getCity, reducing(tallerOf)));
+     * }</pre>
+     * @param op A {@code BinaryOperator<T>} used to reduce the input elements
+     * @return A {@code Collector} which implements the reduction operation
+     * @see #reducing(Object, Function, BinaryOperator)
+     */
+    public static <T> Collector<T, T>
+    reducing(BinaryOperator<T> op) {
+        return reducing(null, op);
+    }
+
+    /**
      * Given a {@code BinaryOperator<U>} and a {@code Function<T,U>}, return a {@code Collector<T,U>}
      * which calculates the reduction of the input elements after applying the mapping function.
      * This is a generalization of {@link #reducing(Object, BinaryOperator)} which allows a transformation of
@@ -500,7 +580,7 @@
     /**
      * Returns a {@code Collector} that implements a concurrent "group by" operation on input
      * elements of type {@code T}.
-     * <p>This is a <em>concurrent</em> Collector.  (TODO need reference).
+     * <p>This is a {@link Collector.Characteristics#CONCURRENT concurrent} Collector.
      *
      * <p>Accepts a classification function from {@code T} to {@code K}.  The collector produces
      * a {@code ConcurrentMap} whose keys are the set of values resulting of applying the
@@ -524,7 +604,7 @@
     /**
      * Returns a {@code Collector} that implements a concurrent "group by" operation on input
      * elements of type {@code T}, resulting in a {@code Map} of a specific type.
-     * <p>This is a <em>concurrent</em> Collector.  (TODO need reference).
+     * <p>This is a {@link Collector.Characteristics#CONCURRENT concurrent} Collector.
      *
      * <p>Accepts a classification function from {@code T} to {@code K}, and a factory function
      * which produces a {@code ConcurrentMap} of the desired type.  The collector populates
@@ -554,7 +634,7 @@
      * Returns a {@code Collector} that implements a concurrent cascaded "group by" operation on
      * input elements of type {@code T}, resulting in a {@code ConcurrentMap} whose values are
      * the result of another reduction, resulting in a {@code ConcurrentMap} of a specific type.
-     * <p>This is a <em>concurrent</em> Collector.  (TODO need reference).
+     * <p>This is a {@link Collector.Characteristics#CONCURRENT concurrent} Collector.
      *
      * <p>Accepts a classification function from {@code T} to {@code K} and a {@code Collector}
      * which implements another reduction on elements of type {@code T}.  The collector populates
@@ -582,7 +662,7 @@
      * Returns a {@code Collector} that implements a cascaded concurrent "group by" operation on
      * input elements of type {@code T}, resulting in a {@code ConcurrentMap} whose values are
      * the result of another reduction.
-     * <p>This is a <em>concurrent</em> Collector.  (TODO need reference).
+     * <p>This is a {@link Collector.Characteristics#CONCURRENT concurrent} Collector.
      *
      * <p>Accepts a classification function from {@code T} to {@code K}, a factory function
      * which produces a {@code ConcurrentMap} of the desired type, and a {@code Collector} which
@@ -766,7 +846,7 @@
      * (according to {@link Object#equals(Object)}), an {@code IllegalStateException} is thrown when the
      * collection operation is performed.
      *
-     * <p>This is a <em>concurrent</em> Collector.  (TODO need reference).
+     * <p>This is a {@link Collector.Characteristics#CONCURRENT concurrent} Collector.
      *
      * @param <T> The type of the input elements, and the input type of the mapping function
      * @param <U> The output type of the mapping function
@@ -785,7 +865,7 @@
      * (according to {@link Object#equals(Object)}), the mapping function is applied to each equal element, and the
      * results are merged with the provided merging function.
      *
-     * <p>This is a <em>concurrent</em> Collector.  (TODO need reference).
+     * <p>This is a {@link Collector.Characteristics#CONCURRENT concurrent} Collector.
      *
      * @param mapper The mapping function
      * @param mapSupplier A function which provides a new, empty {@code Map} into which the results will be inserted
--- a/src/share/classes/java/util/stream/package-info.java	Mon Apr 08 13:23:04 2013 -0400
+++ b/src/share/classes/java/util/stream/package-info.java	Mon Apr 08 16:49:39 2013 -0400
@@ -425,42 +425,61 @@
  *                                       .collect(Collectors.toList());
  * }</pre>
  *
- * <h3><a name="ConcurrentReduction">Reduction, concurrency, and ordering</a></h3>
+ * <h3><a name="ConcurrentReduction">Reduction, Concurrency, and Ordering</a></h3>
  *
- * With some complex reduction operations, such as those that produce a
+ * With some complex reduction operations, for example a collect that produces a
  * {@code Map}, such as:
  * <pre>{@code
  *     Map<Buyer, List<Transaction>> salesByBuyer
- *         = txns.stream().collect(groupingBy(Transaction::getBuyer));
+ *         = txns.parallelStream()
+ *               .collect(Collectors.groupingBy(Transaction::getBuyer));
  * }</pre>
- *
- * it may actually be counterproductive to perform this reduction in parallel,
- * because the merging step (merging one {@code Map} into another by key)
+ * (where {@link java.util.stream.Collectors#groupingBy} is a utility function
+ * that returns a {@link Collector} for grouping sets of elements based on some key)
+ * it may actually be counterproductive to perform the operation in parallel.
+ * This is because the combining step (merging one {@code Map} into another by key)
  * can be expensive for some {@code Map} implementations.
  *
- * <p>If you are willing to relax the constraint or ordering, there is another
- * possibility -- perform a <em>concurrent</em> reduction.  This can be done if
- * the result container can safely be updated concurrently, such as one that
- * collects to a {@code ConcurrentHashMap}.  If it is important that the
- * elements for a given key appear in the order they appear in the source, then
- * we are constrained to implement either a sequential reduction or a
- * merge-based parallel reduction.  But, if this ordering constraint is
- * relaxed, we can also choose to have a shared result container, and let many
- * threads update the result container at once, obviating the need for the
- * expensive merging step.  The
- * {@link java.util.stream.Stream#collect(Supplier, BiConsumer, BiConsumer)}
- * implementation will choose a concurrent collection if the stream is
- * parallel, the {@link java.util.stream.Collector} has the
+ * <p>Suppose, however, that the result container used in this reduction
+ * was a concurrently modifiable collection -- such as a
+ * {@link java.util.concurrent.ConcurrentHashMap ConcurrentHashMap}. In that case,
+ * the parallel invocations of the accumulator could actually deposit their results
+ * concurrently into the same shared result container, elminating the need for the combiner to
+ * merge distinct result containers. This potentially provides a boost
+ * to the parallel execution performance. We call this a <em>concurrent</em> reduction.
+ *
+ * <p>A {@link Collector} that supports concurrent reduction is marked with the
+ * {@link java.util.stream.Collector.Characteristics.CONCURRENT} characteristic. 
+ * Having a concurrent collector is a necessary condition for performing a 
+ * concurrent reduction, but that alone is not sufficient. If you imagine multiple
+ * accumulators depositing results into a shared container, the order in which 
+ * results are deposited is non-deterministic. Consequently, a concurrent reduction
+ * is only possible if ordering is not important for the stream being processed.
+ * The {@link java.util.stream.Stream#collect(Collector)}
+ * implementation will only perform a concurrent reduction if 
+ * <ul>
+ * <li>The stream is parallel</li>;
+ * <li>The collector has the
  * {@link java.util.stream.Collector.Characteristics.CONCURRENT} characteristic,
- * and either the stream is unordered or the collector has the
- * {@link java.util.stream.Collector.Characteristics.UNORDERED} characteristic,
- * as in:
+ * and;</li>
+ * <li>Either the stream is unordered, or the collector has the
+ * {@link java.util.stream.Collector.Characteristics.UNORDERED} characteristic.
+ * </ul>
+ * For example:
  * <pre>{@code
  *     Map<Buyer, List<Transaction>> salesByBuyer
- *         = txns.stream()
+ *         = txns.parallelStream()
  *               .unordered()
  *               .collect(groupingByConcurrent(Transaction::getBuyer));
  * }</pre>
+ * (where {@link java.util.stream.Collectors#groupingByConcurrent} is the concurrent companion
+ * to {@code groupingBy}).
+ *
+ * <p>Note that if it is important that the elements for a given key appear in the
+ * order they appear in the source, then we cannot use a concurrent reduction,
+ * as ordering is one of the casualties of concurrent insertion.  We would then
+ * be constrained to implement either a sequential reduction or a merge-based
+ * parallel reduction.
  *
  * <a name="Associativity"><h2>Associativity</h2></a>
  *
@@ -475,6 +494,7 @@
  * So we can evaluate {@code (a op b)} in parallel with {@code (c op d)} and then invoke {@code op} on
  * the results.
  * TODO what does associative mean for mutative combining functions?
+ * FIXME: we described mutative associativity above.
  *
  * <h2><a name="StreamSources">Stream sources</a></h2>
  * TODO where does this section go?