changeset 7825:adc363b47e78

Add .unordered() operation; eliminate .collectUnordered()
author briangoetz
date Fri, 05 Apr 2013 12:09:30 -0400
parents e6ce826890e0
children cb2c69a86f0f
files src/share/classes/java/util/stream/AbstractPipeline.java src/share/classes/java/util/stream/BaseStream.java src/share/classes/java/util/stream/Collector.java src/share/classes/java/util/stream/DelegatingStream.java src/share/classes/java/util/stream/DoublePipeline.java src/share/classes/java/util/stream/IntPipeline.java src/share/classes/java/util/stream/LongPipeline.java src/share/classes/java/util/stream/ReferencePipeline.java src/share/classes/java/util/stream/Stream.java test-ng/bootlib/java/util/stream/OpTestCase.java test-ng/bootlib/java/util/stream/StreamTestData.java test-ng/tests/org/openjdk/tests/java/util/stream/TabulatorsTest.java
diffstat 12 files changed, 88 insertions(+), 47 deletions(-) [+]
line wrap: on
line diff
--- a/src/share/classes/java/util/stream/AbstractPipeline.java	Fri Apr 05 10:24:49 2013 -0400
+++ b/src/share/classes/java/util/stream/AbstractPipeline.java	Fri Apr 05 12:09:30 2013 -0400
@@ -411,6 +411,10 @@
         return combinedFlags;
     }
 
+    final boolean isOrdered() {
+        return StreamOpFlag.ORDERED.isKnown(combinedFlags);
+    }
+
     @Override
     final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
         Objects.requireNonNull(sink);
--- a/src/share/classes/java/util/stream/BaseStream.java	Fri Apr 05 10:24:49 2013 -0400
+++ b/src/share/classes/java/util/stream/BaseStream.java	Fri Apr 05 12:09:30 2013 -0400
@@ -71,7 +71,7 @@
      * Produces an equivalent stream that is sequential.
      * If this stream is already sequential, may return itself.
      *
-     * <p>This is a <a href="package-summary.html#StreamOps">stateful intermediate operation</a>.
+     * <p>This is an <a href="package-summary.html#StreamOps">intermediate operation</a>.
      *
      * @return a sequential stream
      */
@@ -81,9 +81,18 @@
      * Produces an equivalent stream that is parallel.
      * If this stream is already parallel, may return itself.
      *
-     * <p>This is a <a href="package-summary.html#StreamOps">stateful intermediate operation</a>.
+     * <p>This is an <a href="package-summary.html#StreamOps">intermediate operation</a>.
      *
      * @return a parallel stream
      */
     S parallel();
+
+    /**
+     * Produces an equivalent stream that is unordered. If this stream is
+     * already unordered, may return itself.
+     *
+     * <p>This is an <a href="package-summary.html#StreamOps">intermediate operation</a>.
+     * @return an unordered stream
+     */
+    S unordered();
 }
--- a/src/share/classes/java/util/stream/Collector.java	Fri Apr 05 10:24:49 2013 -0400
+++ b/src/share/classes/java/util/stream/Collector.java	Fri Apr 05 12:09:30 2013 -0400
@@ -107,7 +107,6 @@
  * TODO Associativity and commutativity
  *
  * @see Stream#collect(Collector)
- * @see Stream#collectUnordered(Collector)
  * @see Collectors
  *
  * @param <T> The type of input element to the collect operation
--- a/src/share/classes/java/util/stream/DelegatingStream.java	Fri Apr 05 10:24:49 2013 -0400
+++ b/src/share/classes/java/util/stream/DelegatingStream.java	Fri Apr 05 12:09:30 2013 -0400
@@ -33,7 +33,6 @@
 import java.util.function.BiConsumer;
 import java.util.function.BiFunction;
 import java.util.function.BinaryOperator;
-import java.util.function.BooleanSupplier;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.IntFunction;
@@ -217,11 +216,6 @@
     }
 
     @Override
-    public <R> R collectUnordered(Collector<? super T, R> collector) {
-        return delegate.collectUnordered(collector);
-    }
-
-    @Override
     public Optional<T> max(Comparator<? super T> comparator) {
         return delegate.max(comparator);
     }
@@ -262,6 +256,11 @@
     }
 
     @Override
+    public Stream<T> unordered() {
+        return delegate.unordered();
+    }
+
+    @Override
     public Stream<T> sequential() {
         return delegate.sequential();
     }
--- a/src/share/classes/java/util/stream/DoublePipeline.java	Fri Apr 05 10:24:49 2013 -0400
+++ b/src/share/classes/java/util/stream/DoublePipeline.java	Fri Apr 05 12:09:30 2013 -0400
@@ -263,6 +263,18 @@
     }
 
     @Override
+    public DoubleStream unordered() {
+        if (!isOrdered())
+            return this;
+        return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, StreamOpFlag.NOT_ORDERED) {
+            @Override
+            Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
+                return sink;
+            }
+        };
+    }
+
+    @Override
     public final DoubleStream filter(DoublePredicate predicate) {
         Objects.requireNonNull(predicate);
         return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE,
--- a/src/share/classes/java/util/stream/IntPipeline.java	Fri Apr 05 10:24:49 2013 -0400
+++ b/src/share/classes/java/util/stream/IntPipeline.java	Fri Apr 05 12:09:30 2013 -0400
@@ -291,6 +291,18 @@
     }
 
     @Override
+    public IntStream unordered() {
+        if (!isOrdered())
+            return this;
+        return new StatelessOp<Integer>(this, StreamShape.INT_VALUE, StreamOpFlag.NOT_ORDERED) {
+            @Override
+            Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
+                return sink;
+            }
+        };
+    }
+
+    @Override
     public final IntStream filter(IntPredicate predicate) {
         Objects.requireNonNull(predicate);
         return new StatelessOp<Integer>(this, StreamShape.INT_VALUE,
--- a/src/share/classes/java/util/stream/LongPipeline.java	Fri Apr 05 10:24:49 2013 -0400
+++ b/src/share/classes/java/util/stream/LongPipeline.java	Fri Apr 05 12:09:30 2013 -0400
@@ -275,6 +275,18 @@
     }
 
     @Override
+    public LongStream unordered() {
+        if (!isOrdered())
+            return this;
+        return new StatelessOp<Long>(this, StreamShape.LONG_VALUE, StreamOpFlag.NOT_ORDERED) {
+            @Override
+            Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
+                return sink;
+            }
+        };
+    }
+
+    @Override
     public final LongStream filter(LongPredicate predicate) {
         Objects.requireNonNull(predicate);
         return new StatelessOp<Long>(this, StreamShape.LONG_VALUE,
--- a/src/share/classes/java/util/stream/ReferencePipeline.java	Fri Apr 05 10:24:49 2013 -0400
+++ b/src/share/classes/java/util/stream/ReferencePipeline.java	Fri Apr 05 12:09:30 2013 -0400
@@ -141,6 +141,18 @@
     // Stateless intermediate operations from Stream
 
     @Override
+    public Stream<U> unordered() {
+        if (!isOrdered())
+            return this;
+        return new StatelessOp<U, U>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_ORDERED) {
+            @Override
+            Sink<U> opWrapSink(int flags, Sink<U> sink) {
+                return sink;
+            }
+        };
+    }
+
+    @Override
     public final Stream<U> filter(Predicate<? super U> predicate) {
         Objects.requireNonNull(predicate);
         return new StatelessOp<U, U>(this, StreamShape.REFERENCE,
@@ -434,6 +446,14 @@
 
     @Override
     public final <R> R collect(Collector<? super U, R> collector) {
+        if (isParallel()
+                && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
+                && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
+            R container = collector.resultSupplier().get();
+            BiFunction<R, ? super U, R> accumulator = collector.accumulator();
+            forEach(u -> accumulator.apply(container, u));
+            return container;
+        }
         return evaluate(ReduceOps.makeRef(collector));
     }
 
@@ -443,19 +463,6 @@
     }
 
     @Override
-    public final <R> R collectUnordered(Collector<? super U, R> collector) {
-        if (collector.characteristics().contains(Collector.Characteristics.CONCURRENT)) {
-            R container = collector.resultSupplier().get();
-            BiFunction<R, ? super U, R> accumulator = collector.accumulator();
-            forEach(u -> accumulator.apply(container, u));
-            return container;
-        }
-        else {
-            return collect(collector);
-        }
-    }
-
-    @Override
     public final Optional<U> max(Comparator<? super U> comparator) {
         return reduce(Comparators.greaterOf(comparator));
     }
--- a/src/share/classes/java/util/stream/Stream.java	Fri Apr 05 10:24:49 2013 -0400
+++ b/src/share/classes/java/util/stream/Stream.java	Fri Apr 05 12:09:30 2013 -0400
@@ -665,25 +665,6 @@
      */
     <R> R collect(Collector<? super T, R> collector);
 
-    /** Performs a <a href="package-summary.html#MutableReduction">mutable
-     * reduction</a> operation on the elements of this stream using a
-     * {@code Collector} object to describe the reduction, without regard to
-     * encounter order. If the provided {@code Collector} is concurrent, this
-     * implementation may invoke the function returned by
-     * {@link Collector#accumulator()} concurrently on the same result object.
-     * In some cases, implementing a reduction by concurrently modifying a
-     * shared data structure may be more efficient than partitioning and merging.
-     *
-     * <p>This is a <a href="package-summary.html#StreamOps">terminal operation</a>.
-     *
-     * @param collector The {@code Collector} describing the reduction
-     * @param <R> The type of the result
-     * @return The result of the reduction
-     * @see #collect(Supplier, BiConsumer, BiConsumer)
-     * @see Collectors
-     */
-    <R> R collectUnordered(Collector<? super T, R> collector);
-
     /**
      * Returns the maximal element of this stream according to the provided
      * {@code Comparator}.  This is a special case of a
--- a/test-ng/bootlib/java/util/stream/OpTestCase.java	Fri Apr 05 10:24:49 2013 -0400
+++ b/test-ng/bootlib/java/util/stream/OpTestCase.java	Fri Apr 05 12:09:30 2013 -0400
@@ -558,6 +558,10 @@
 
         //
 
+        default boolean isOrdered() {
+            return spliterator().hasCharacteristics(Spliterator.ORDERED);
+        }
+
         StreamShape getShape();
 
         default <A extends Collection<? super T>> A into(A target) {
--- a/test-ng/bootlib/java/util/stream/StreamTestData.java	Fri Apr 05 10:24:49 2013 -0400
+++ b/test-ng/bootlib/java/util/stream/StreamTestData.java	Fri Apr 05 12:09:30 2013 -0400
@@ -96,10 +96,8 @@
         }
 
         @Override
-        @SuppressWarnings({ "rawtypes", "unchecked" })
         public Spliterator<T> spliterator() {
-            // @@@ FIXME!
-            return Arrays.spliterator((T[]) collection.toArray());
+            return collection.spliterator();
         }
 
         @Override
--- a/test-ng/tests/org/openjdk/tests/java/util/stream/TabulatorsTest.java	Fri Apr 05 10:24:49 2013 -0400
+++ b/test-ng/tests/org/openjdk/tests/java/util/stream/TabulatorsTest.java	Fri Apr 05 12:09:30 2013 -0400
@@ -42,6 +42,7 @@
 import java.util.function.Supplier;
 import java.util.stream.Collector;
 import java.util.stream.Collectors;
+import java.util.stream.LambdaTestHelpers;
 import java.util.stream.OpTestCase;
 import java.util.stream.Stream;
 import java.util.stream.StreamOpFlagTestHelper;
@@ -197,10 +198,13 @@
     private<T, M extends Map> void exerciseMapTabulation(StreamTestData<T> data,
                                                          Collector<T, ? extends M> collector,
                                                          TabulationAssertion<T, M> assertion) throws ReflectiveOperationException {
-        M m = exerciseTerminalOps(data, s -> s.collect(collector));
-        assertion.assertValue(m, () -> data.stream(), true);
+        M m = withData(data)
+                .terminal(s -> s.collect(collector))
+                .parallelEqualityAsserter(data.isOrdered() ? LambdaTestHelpers::assertContentsEqual : this::nestedMapEqualityAssertion)
+                .exercise();
+        assertion.assertValue(m, () -> data.stream(), data.isOrdered());
         m = withData(data)
-                .terminal(s -> s.collectUnordered(collector))
+                .terminal(s -> s.unordered().collect(collector))
                 .parallelEqualityAsserter(this::nestedMapEqualityAssertion)
                 .exercise();
         assertion.assertValue(m, () -> data.stream(), false);