changeset 6380:b734873a4c79

- ensure PARALLEL flag is cleared if sequential evaluation is performed because the stream is pulled. - Flag tests need to use test data that supports parallel streams (ArrayList temporarily does not currently support parallel streams). - Added flag test that explicit tests that SIZED and ORDERED are preserved for stateful operations when evaluated in parallel.
author psandoz
date Wed, 07 Nov 2012 13:24:34 +0100
parents a5509d6347cf
children 9de13de7fc77
files src/share/classes/java/util/streams/AbstractPipeline.java src/share/classes/java/util/streams/StreamOpFlags.java test-ng/tests/org/openjdk/tests/java/util/streams/ops/FlagOpTest.java
diffstat 3 files changed, 86 insertions(+), 12 deletions(-) [+]
line wrap: on
line diff
--- a/src/share/classes/java/util/streams/AbstractPipeline.java	Wed Nov 07 11:42:56 2012 +0000
+++ b/src/share/classes/java/util/streams/AbstractPipeline.java	Wed Nov 07 13:24:34 2012 +0100
@@ -127,18 +127,18 @@
     }
 
     protected<R> R evaluate(TerminalOp<E_OUT, R> terminal) {
-        // @@@ NYI If the source size estimate is small, don't bother going parallel
         if (StreamOpFlags.PARALLEL.isKnown(sourceFlags)) {
             return evaluateParallel(terminal);
         }
         else
-            return evaluateSequential(terminal);
+            return evaluateSequential(terminal, sourceFlags);
     }
 
     protected<R> R evaluateParallel(TerminalOp<E_OUT, R> terminal) {
         if (isPipelinePulled()) {
             // If already pulled then cannot split, revert back to sequential evaluation
-            return evaluateSequential(terminal);
+            // Ensure the parallel flag is cleared
+            return evaluateSequential(terminal, StreamOpFlags.PARALLEL.clearFromFlags(sourceFlags));
         }
 
         final IntermediateOp[] ops = ops();
@@ -204,8 +204,8 @@
     }
 
     @SuppressWarnings("unchecked")
-    protected <R> R evaluateSequential(TerminalOp<E_OUT, R> terminal) {
-        return (R) terminal.evaluateSequential(new SequentialImplPipelineHelperSource());
+    protected <R> R evaluateSequential(TerminalOp<E_OUT, R> terminal, int sourceFlags) {
+        return (R) terminal.evaluateSequential(new SequentialImplPipelineHelperSource(sourceFlags));
     }
 
     static abstract class AbstractPipelineHelper<P_IN, P_OUT> implements PipelineHelper<P_IN, P_OUT> {
@@ -343,8 +343,8 @@
 
     class SequentialImplPipelineHelperSource extends SequentialImplPipelineHelper {
 
-        <R> SequentialImplPipelineHelperSource() {
-            super(AbstractPipeline.this.spliterator, AbstractPipeline.this.sourceFlags, ops());
+        <R> SequentialImplPipelineHelperSource(int sourceFlags) {
+            super(AbstractPipeline.this.spliterator, sourceFlags, ops());
         }
 
         @Override
@@ -459,6 +459,8 @@
                 pipes[0].iterator = spliterator.iterator();
             }
 
+            // Ensure the parallel flag is cleared, if set
+            int sourceFlags = StreamOpFlags.PARALLEL.clearFromFlags(this.sourceFlags);
             int opsFlags = StreamOpFlags.INITIAL_OPS_VALUE;
             for (int i = 1; i <= depth; i++) {
                 p = pipes[i];
--- a/src/share/classes/java/util/streams/StreamOpFlags.java	Wed Nov 07 11:42:56 2012 +0000
+++ b/src/share/classes/java/util/streams/StreamOpFlags.java	Wed Nov 07 13:24:34 2012 +0100
@@ -146,6 +146,16 @@
     }
 
     /**
+     * Clear this flag.
+     *
+     * @param flags the flags from which to clear this flag
+     * @return the flags with this flag cleared.
+     */
+    public int clearFromFlags(int flags) {
+        return flags & ~set;
+    }
+
+    /**
      * The initial value to be combined with the flags of the first operation in the pipeline.
      */
     public static final int INITIAL_OPS_VALUE = createMask(0b11, 0b00); // 0b00_11_11_11_11_11;
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/FlagOpTest.java	Wed Nov 07 11:42:56 2012 +0000
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/FlagOpTest.java	Wed Nov 07 13:24:34 2012 +0100
@@ -25,6 +25,7 @@
 package org.openjdk.tests.java.util.streams.ops;
 
 import org.openjdk.tests.java.util.streams.StreamTestDataProvider;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
 import java.util.*;
@@ -119,7 +120,7 @@
                                          EnumSet.noneOf(StreamOpFlags.class),
                                          notKnown.clone()));
 
-        TestData<Integer> data = new CollectionTestData<>("List", countTo(10));
+        TestData<Integer> data = new ArrayTestData<>("Array", countTo(10).toArray(new Integer[0]));
         @SuppressWarnings("rawtypes")
         FlagDeclaringOp[] opsArray = ops.toArray(new FlagDeclaringOp[ops.size()]);
 
@@ -152,7 +153,7 @@
                                          EnumSet.noneOf(StreamOpFlags.class),
                                          notKnown.clone()));
 
-        TestData<Integer> data = new CollectionTestData<>("List", countTo(10));
+        TestData<Integer> data = new ArrayTestData<>("Array", countTo(10).toArray(new Integer[0]));
         @SuppressWarnings("rawtypes")
         FlagDeclaringOp[] opsArray = ops.toArray(new FlagDeclaringOp[ops.size()]);
 
@@ -187,7 +188,7 @@
                                          preserve.clone(),
                                          EnumSet.noneOf(StreamOpFlags.class)));
 
-        TestData<Integer> data = new CollectionTestData<>("List", countTo(10));
+        TestData<Integer> data = new ArrayTestData<>("Array", countTo(10).toArray(new Integer[0]));
         @SuppressWarnings("rawtypes")
         IntermediateOp[] opsArray = ops.toArray(new IntermediateOp[ops.size()]);
 
@@ -223,11 +224,72 @@
                                          preserve.clone(),
                                          EnumSet.noneOf(StreamOpFlags.class)));
 
-        TestData<Integer> data = new CollectionTestData<>("List", countTo(10));
+        TestData<Integer> data = new ArrayTestData<>("Array", countTo(10).toArray(new Integer[0]));
         @SuppressWarnings("rawtypes")
         IntermediateOp[] opsArray = ops.toArray(new IntermediateOp[ops.size()]);
 
         testUsingData(data).without(IntermediateOpTest.PAR_STREAM_TO_ARRAY_CLEAR_SIZED).excerciseOps(opsArray);
     }
 
-}
+    public void testFlagsSizedOrderedParallelCollect() {
+        EnumSet<StreamOpFlags> parKnown = EnumSet.of(StreamOpFlags.ORDERED, StreamOpFlags.SIZED);
+        EnumSet<StreamOpFlags> serKnown = parKnown.clone();
+
+        List<IntermediateOp<Integer, Integer>> ops = new ArrayList<>();
+        for (StreamOpFlags f : parKnown) {
+            ops.add(CollectorOps.<Integer>parallelCollector());
+            ops.add(new ParSerTestFlagExpectedOp<>(f.clear(),
+                                             parKnown,
+                                             serKnown));
+            serKnown.remove(f);
+        }
+        ops.add(CollectorOps.<Integer>parallelCollector());
+        ops.add(new ParSerTestFlagExpectedOp<>(0,
+                                         parKnown,
+                                         EnumSet.noneOf(StreamOpFlags.class)));
+
+        TestData<Integer> data = new ArrayTestData<>("Array", countTo(10).toArray(new Integer[0]));
+        @SuppressWarnings("rawtypes")
+        IntermediateOp[] opsArray = ops.toArray(new IntermediateOp[ops.size()]);
+
+        testUsingData(data).excerciseOps(opsArray);
+    }
+
+    static class ParSerTestFlagExpectedOp<T> extends FlagDeclaringOp<T> {
+        final EnumSet<StreamOpFlags> parKnown;
+        final EnumSet<StreamOpFlags> serKnown;
+
+        ParSerTestFlagExpectedOp(int flags, EnumSet<StreamOpFlags> known, EnumSet<StreamOpFlags> serKnown) {
+            super(flags);
+            this.parKnown = known;
+            this.serKnown = serKnown;
+        }
+
+        @Override
+        public Iterator<T> wrapIterator(int flags, Iterator<T> downstream) {
+            assertFlags(flags);
+            return downstream;
+        }
+
+        @Override
+        @SuppressWarnings({"unchecked", "rawtypes"})
+        public Sink<T> wrapSink(int flags, Sink upstream) {
+            assertFlags(flags);
+            return upstream;
+        }
+
+        protected void assertFlags(int flags) {
+            if (StreamOpFlags.PARALLEL.isKnown(flags)) {
+                for (StreamOpFlags f : parKnown) {
+                    Assert.assertTrue(f.isKnown(flags), String.format("Flag %s is not known, but should be known.", f.toString()));
+                }
+
+            } else {
+                for (StreamOpFlags f : serKnown) {
+                    Assert.assertTrue(f.isKnown(flags), String.format("Flag %s is not known, but should be known.", f.toString()));
+                }
+
+            }
+        }
+    }
+}
\ No newline at end of file