changeset 7821:137dc53c9f85

- Replace PARALLEL flag with a boolean value associated with the head stage of the pipeline. - Preparation of flags only required for parallel evaluation.
author psandoz
date Fri, 05 Apr 2013 11:25:17 +0200
parents e473df6aaa89
children 25f122a48151
files src/share/classes/java/util/stream/AbstractPipeline.java src/share/classes/java/util/stream/DoublePipeline.java src/share/classes/java/util/stream/IntPipeline.java src/share/classes/java/util/stream/LongPipeline.java src/share/classes/java/util/stream/ReferencePipeline.java src/share/classes/java/util/stream/StreamOpFlag.java src/share/classes/java/util/stream/Streams.java test-ng/bootlib/java/util/stream/CollectorOps.java test-ng/bootlib/java/util/stream/FlagDeclaringOp.java test-ng/bootlib/java/util/stream/IntermediateOp.java test-ng/bootlib/java/util/stream/IntermediateTestOp.java test-ng/bootlib/java/util/stream/LambdaTestHelpers.java test-ng/bootlib/java/util/stream/OpTestCase.java test-ng/bootlib/java/util/stream/StatefulOp.java test-ng/bootlib/java/util/stream/StatefulTestOp.java test-ng/bootlib/java/util/stream/StatelessTestOp.java test-ng/bootlib/java/util/stream/TestFlagExpectedOp.java test-ng/boottests/java/util/stream/FlagOpTest.java test-ng/boottests/java/util/stream/StreamFlagsTest.java test-ng/boottests/java/util/stream/StreamOpFlagsTest.java test-ng/boottests/java/util/stream/UnorderedTest.java test-ng/tests/org/openjdk/tests/java/util/stream/StreamSpliteratorTest.java
diffstat 22 files changed, 531 insertions(+), 645 deletions(-) [+]
line wrap: on
line diff
--- a/src/share/classes/java/util/stream/AbstractPipeline.java	Thu Apr 04 22:28:40 2013 -0400
+++ b/src/share/classes/java/util/stream/AbstractPipeline.java	Fri Apr 05 11:25:17 2013 +0200
@@ -112,29 +112,31 @@
      */
     private Supplier<? extends Spliterator<?>> sourceSupplier;
 
-    /** True if this pipeline has been consumed */
+    /** True if this pipeline has been linked or consumed */
     private boolean linkedOrConsumed;
 
     /** True if there are any stateful ops in the pipeline; only valid for the source stage */
     private boolean sourceAnyStateful;
 
-    /** True if there have been any calls to .sequential() or .parallel(); only valid for the source stage */
-    private boolean sourceAnyParChange;
+    /** True if pipeline is parallel, otherwise the pipeline is sequential; only valid for the source stage */
+    private boolean parallel;
 
     /**
      * Constructor for the head of a stream pipeline.
      *
      * @param source {@code Supplier<Spliterator>} describing the stream source
      * @param sourceFlags The source flags for the stream source, described in {@link StreamOpFlag}
+     * @param parallel True if the pipeline is parallel
      */
     AbstractPipeline(Supplier<? extends Spliterator<?>> source,
-                     int sourceFlags) {
+                     int sourceFlags, boolean parallel) {
         this.previousStage = null;
         this.sourceSupplier = source;
         this.sourceStage = this;
-        this.sourceOrOpFlags = StreamOpFlag.combineOpFlags(sourceFlags, StreamOpFlag.INITIAL_OPS_VALUE);
-        this.combinedFlags = sourceOrOpFlags;
+        this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
+        this.combinedFlags = StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
         this.depth = 0;
+        this.parallel = parallel;
     }
 
     /**
@@ -142,15 +144,17 @@
      *
      * @param source {@code Spliterator} describing the stream source
      * @param sourceFlags The source flags for the stream source, described in {@link StreamOpFlag}
+     * @param parallel True if the pipeline is parallel
      */
     AbstractPipeline(Spliterator<?> source,
-                     int sourceFlags) {
+                     int sourceFlags, boolean parallel) {
         this.previousStage = null;
-        this.sourceOrOpFlags = StreamOpFlag.combineOpFlags(sourceFlags, StreamOpFlag.INITIAL_OPS_VALUE);
         this.sourceSpliterator = source;
         this.sourceStage = this;
-        this.combinedFlags = sourceOrOpFlags;
+        this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
+        this.combinedFlags = StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
         this.depth = 0;
+        this.parallel = parallel;
     }
 
     /**
@@ -175,59 +179,6 @@
         this.depth = previousStage.depth + 1;
     }
 
-    /**
-     * Prepares the pipeline for evaluation.
-     * @param terminalFlags The terminal operation flags, described in {@link StreamOpFlag}
-     */
-    private void prepare(int terminalFlags) {
-        if (isParallel()) {
-            AbstractPipeline backPropagationHead = sourceStage;
-            if (sourceStage.sourceAnyStateful) {
-                int depth = 1;
-                for (AbstractPipeline u = sourceStage, p = sourceStage.nextStage;
-                     p != null;
-                     u = p, p = p.nextStage) {
-                    int thisOpFlags = p.sourceOrOpFlags;
-                    if (p.opIsStateful()) {
-                        // If the stateful operation is a short-circuit operation
-                        // then move the back propagation head forwards
-                        // NOTE: there are no size-injecting ops
-                        if (StreamOpFlag.SHORT_CIRCUIT.isKnown(thisOpFlags)) {
-                            backPropagationHead = p;
-                        }
-
-                        depth = 0;
-                        // The following injects size, it is equivalent to:
-                        // StreamOpFlag.combineOpFlags(StreamOpFlag.IS_SIZED, p.combinedFlags);
-                        thisOpFlags = (thisOpFlags & ~StreamOpFlag.NOT_SIZED) | StreamOpFlag.IS_SIZED;
-                    }
-                    p.depth = depth++;
-                    p.combinedFlags = StreamOpFlag.combineOpFlags(thisOpFlags, u.combinedFlags);
-                }
-            }
-
-            // Apply the upstream terminal flags
-            if (terminalFlags != 0) {
-                int upstreamTerminalFlags = terminalFlags & StreamOpFlag.UPSTREAM_TERMINAL_OP_MASK;
-                for (AbstractPipeline p = backPropagationHead; p != null; p = p.nextStage) {
-                    p.combinedFlags = StreamOpFlag.combineOpFlags(upstreamTerminalFlags, p.combinedFlags);
-                }
-            }
-        }
-        else {
-            if (sourceStage.sourceAnyParChange) {
-                for (AbstractPipeline u = sourceStage, p = sourceStage.nextStage;
-                     p != null;
-                     u = p, p = p.nextStage) {
-                    p.combinedFlags = StreamOpFlag.combineOpFlags(p.sourceOrOpFlags, u.combinedFlags);
-                }
-            }
-        }
-        // Update last stage to incorporate terminal flags
-        if (terminalFlags != 0)
-            combinedFlags = StreamOpFlag.combineOpFlags(terminalFlags, combinedFlags);
-    }
-
 
     // Terminal evaluation methods
 
@@ -244,10 +195,9 @@
             throw new IllegalStateException("stream has already been operated upon");
         linkedOrConsumed = true;
 
-        prepare(terminalOp.getOpFlags());
         return isParallel()
-               ? (R) terminalOp.evaluateParallel(this, sourceSpliterator())
-               : (R) terminalOp.evaluateSequential(this, sourceSpliterator());
+               ? (R) terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
+               : (R) terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
     }
 
     /**
@@ -261,34 +211,28 @@
             throw new IllegalStateException("stream has already been operated upon");
         linkedOrConsumed = true;
 
-        prepare(0);
         // If the last intermediate operation is stateful then
         // evaluate directly to avoid an extra collection step
         if (isParallel() && previousStage != null && opIsStateful()) {
-            return opEvaluateParallel(previousStage, previousStage.sourceSpliterator(), generator);
+            return opEvaluateParallel(previousStage, previousStage.sourceSpliterator(0), generator);
         }
         else {
-            return evaluate(sourceSpliterator(), true, generator);
+            return evaluate(sourceSpliterator(0), true, generator);
         }
     }
 
+
+    // BaseStream
+
     /** Implements {@link BaseStream#sequential()} */
     public final S sequential() {
-        if (StreamOpFlag.PARALLEL.isKnown(sourceStage.combinedFlags)) {
-            sourceStage.sourceAnyParChange = true;
-            sourceStage.combinedFlags = StreamOpFlag.combineOpFlags(StreamOpFlag.NOT_PARALLEL,
-                                                                    sourceStage.combinedFlags);
-        }
+        sourceStage.parallel = false;
         return (S) this;
     }
 
     /** Implements {@link BaseStream#parallel()} */
     public final S parallel() {
-        if (!StreamOpFlag.PARALLEL.isKnown(sourceStage.combinedFlags)) {
-            sourceStage.sourceAnyParChange = true;
-            sourceStage.combinedFlags = StreamOpFlag.combineOpFlags(StreamOpFlag.IS_PARALLEL,
-                                                                    sourceStage.combinedFlags);
-        }
+        sourceStage.parallel = true;
         return (S) this;
     }
 
@@ -299,7 +243,6 @@
             throw new IllegalStateException("stream has already been operated upon");
         linkedOrConsumed = true;
 
-        prepare(0);
         if (this == sourceStage) {
             if (sourceStage.sourceSpliterator != null) {
                 Spliterator<E_OUT> s = sourceStage.sourceSpliterator;
@@ -316,15 +259,18 @@
             }
         }
         else {
-            return wrap(this, () -> sourceSpliterator(), isParallel());
+            return wrap(this, () -> sourceSpliterator(0), isParallel());
         }
     }
 
     /** Implements {@link BaseStream#isParallel()} */
     public final boolean isParallel() {
-        return StreamOpFlag.PARALLEL.isKnown(sourceStage.combinedFlags);
+        return sourceStage.parallel;
     }
 
+
+    //
+
     /**
      * Returns the composition of stream flags of the stream source and all
      * intermediate operations.
@@ -336,17 +282,53 @@
     final int getStreamFlags() {
         // @@@ Currently only used by tests, review and see if functionality
         //     can be replaced by spliterator().characteristics()
-        prepare(0);
         return StreamOpFlag.toStreamFlags(combinedFlags);
     }
 
+    private void parallelPrepare(int terminalFlags) {
+        AbstractPipeline backPropagationHead = sourceStage;
+        if (sourceStage.sourceAnyStateful) {
+            int depth = 1;
+            for (AbstractPipeline u = sourceStage, p = sourceStage.nextStage;
+                 p != null;
+                 u = p, p = p.nextStage) {
+                int thisOpFlags = p.sourceOrOpFlags;
+                if (p.opIsStateful()) {
+                    // If the stateful operation is a short-circuit operation
+                    // then move the back propagation head forwards
+                    // NOTE: there are no size-injecting ops
+                    if (StreamOpFlag.SHORT_CIRCUIT.isKnown(thisOpFlags)) {
+                        backPropagationHead = p;
+                    }
+
+                    depth = 0;
+                    // The following injects size, it is equivalent to:
+                    // StreamOpFlag.combineOpFlags(StreamOpFlag.IS_SIZED, p.combinedFlags);
+                    thisOpFlags = (thisOpFlags & ~StreamOpFlag.NOT_SIZED) | StreamOpFlag.IS_SIZED;
+                }
+                p.depth = depth++;
+                p.combinedFlags = StreamOpFlag.combineOpFlags(thisOpFlags, u.combinedFlags);
+            }
+        }
+
+        // Apply the upstream terminal flags
+        if (terminalFlags != 0) {
+            int upstreamTerminalFlags = terminalFlags & StreamOpFlag.UPSTREAM_TERMINAL_OP_MASK;
+            for (AbstractPipeline p = backPropagationHead; p.nextStage != null; p = p.nextStage) {
+                p.combinedFlags = StreamOpFlag.combineOpFlags(upstreamTerminalFlags, p.combinedFlags);
+            }
+
+            combinedFlags = StreamOpFlag.combineOpFlags(terminalFlags, combinedFlags);
+        }
+    }
+
     /**
      * Get the source spliterator for this pipeline stage.  For a sequential or stateless
      * parallel pipeline, this is the source spliterator.  For a stateful parallel pipeline,
      * this is a spliterator describing the results of all computations up to and including
      * the most recent stateful operation.
      */
-    private Spliterator<?> sourceSpliterator() {
+    private Spliterator<?> sourceSpliterator(int terminalFlags) {
         // Get the source spliterator of the pipeline
         Spliterator<?> spliterator = null;
         if (sourceStage.sourceSpliterator != null) {
@@ -362,6 +344,11 @@
         }
 
         if (isParallel()) {
+            // @@@ Merge parallelPrepare with the loop below and use the
+            //     spliterator characteristics to determine if SIZED
+            //     should be injected
+            parallelPrepare(terminalFlags);
+
             // Adapt the source spliterator, evaluating each stateful op
             // in the pipeline up to and including this pipeline stage
             for (AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this;
@@ -373,10 +360,14 @@
                 }
             }
         }
+        else if (terminalFlags != 0)  {
+            combinedFlags = StreamOpFlag.combineOpFlags(terminalFlags, combinedFlags);
+        }
 
         return spliterator;
     }
 
+
     // PipelineHelper
 
     @Override
--- a/src/share/classes/java/util/stream/DoublePipeline.java	Thu Apr 04 22:28:40 2013 -0400
+++ b/src/share/classes/java/util/stream/DoublePipeline.java	Fri Apr 05 11:25:17 2013 +0200
@@ -57,8 +57,9 @@
      * @param sourceFlags The source flags for the stream source, described in
      * {@link StreamOpFlag}
      */
-    DoublePipeline(Supplier<? extends Spliterator<Double>> source, int sourceFlags) {
-        super(source, sourceFlags);
+    DoublePipeline(Supplier<? extends Spliterator<Double>> source,
+                   int sourceFlags, boolean parallel) {
+        super(source, sourceFlags, parallel);
     }
 
     /**
@@ -68,8 +69,9 @@
      * @param sourceFlags The source flags for the stream source, described in
      * {@link StreamOpFlag}
      */
-    DoublePipeline(Spliterator<Double> source, int sourceFlags) {
-        super(source, sourceFlags);
+    DoublePipeline(Spliterator<Double> source,
+                   int sourceFlags, boolean parallel) {
+        super(source, sourceFlags, parallel);
     }
 
     /**
@@ -447,9 +449,11 @@
          *
          * @param source {@code Supplier<Spliterator>} describing the stream source
          * @param sourceFlags The source flags for the stream source, described in {@link StreamOpFlag}
+         * @param parallel True if the pipeline is parallel
          */
-        Head(Supplier<? extends Spliterator<Double>> source, int sourceFlags) {
-            super(source, sourceFlags);
+        Head(Supplier<? extends Spliterator<Double>> source,
+             int sourceFlags, boolean parallel) {
+            super(source, sourceFlags, parallel);
         }
 
         /**
@@ -457,9 +461,11 @@
          *
          * @param source {@code Spliterator} describing the stream source
          * @param sourceFlags The source flags for the stream source, described in {@link StreamOpFlag}
+         * @param parallel True if the pipeline is parallel
          */
-        Head(Spliterator<Double> source, int sourceFlags) {
-            super(source, sourceFlags);
+        Head(Spliterator<Double> source,
+             int sourceFlags, boolean parallel) {
+            super(source, sourceFlags, parallel);
         }
 
         @Override
--- a/src/share/classes/java/util/stream/IntPipeline.java	Thu Apr 04 22:28:40 2013 -0400
+++ b/src/share/classes/java/util/stream/IntPipeline.java	Fri Apr 05 11:25:17 2013 +0200
@@ -55,10 +55,12 @@
      *
      * @param source {@code Supplier<Spliterator>} describing the stream source
      * @param sourceFlags The source flags for the stream source, described in
-     * {@link StreamOpFlag}
+     *        {@link StreamOpFlag}
+     * @param parallel True if the pipeline is parallel
      */
-    IntPipeline(Supplier<? extends Spliterator<Integer>> source, int sourceFlags) {
-        super(source, sourceFlags);
+    IntPipeline(Supplier<? extends Spliterator<Integer>> source,
+                int sourceFlags, boolean parallel) {
+        super(source, sourceFlags, parallel);
     }
 
     /**
@@ -66,10 +68,12 @@
      *
      * @param source {@code Spliterator} describing the stream source
      * @param sourceFlags The source flags for the stream source, described in
-     * {@link StreamOpFlag}
+     *        {@link StreamOpFlag}
+     * @param parallel True if the pipeline is parallel
      */
-    IntPipeline(Spliterator<Integer> source, int sourceFlags) {
-        super(source, sourceFlags);
+    IntPipeline(Spliterator<Integer> source,
+                int sourceFlags, boolean parallel) {
+        super(source, sourceFlags, parallel);
     }
 
     /**
@@ -475,9 +479,11 @@
          *
          * @param source {@code Supplier<Spliterator>} describing the stream source
          * @param sourceFlags The source flags for the stream source, described in {@link StreamOpFlag}
+         * @param parallel True if the pipeline is parallel
          */
-        Head(Supplier<? extends Spliterator<Integer>> source, int sourceFlags) {
-            super(source, sourceFlags);
+        Head(Supplier<? extends Spliterator<Integer>> source,
+             int sourceFlags, boolean parallel) {
+            super(source, sourceFlags, parallel);
         }
 
         /**
@@ -485,9 +491,11 @@
          *
          * @param source {@code Spliterator} describing the stream source
          * @param sourceFlags The source flags for the stream source, described in {@link StreamOpFlag}
+         * @param parallel True if the pipeline is parallel
          */
-        Head(Spliterator<Integer> source, int sourceFlags) {
-            super(source, sourceFlags);
+        Head(Spliterator<Integer> source,
+             int sourceFlags, boolean parallel) {
+            super(source, sourceFlags, parallel);
         }
 
         @Override
--- a/src/share/classes/java/util/stream/LongPipeline.java	Thu Apr 04 22:28:40 2013 -0400
+++ b/src/share/classes/java/util/stream/LongPipeline.java	Fri Apr 05 11:25:17 2013 +0200
@@ -55,20 +55,26 @@
      * Constructor for the head of a stream pipeline.
      *
      * @param source {@code Supplier<Spliterator>} describing the stream source
-     * @param sourceFlags The source flags for the stream source, described in {@link StreamOpFlag}
+     * @param sourceFlags The source flags for the stream source, described in
+     *        {@link StreamOpFlag}
+     * @param parallel True if the pipeline is parallel
      */
-    LongPipeline(Supplier<? extends Spliterator<Long>> source, int sourceFlags) {
-        super(source, sourceFlags);
+    LongPipeline(Supplier<? extends Spliterator<Long>> source,
+                 int sourceFlags, boolean parallel) {
+        super(source, sourceFlags, parallel);
     }
 
     /**
      * Constructor for the head of a stream pipeline.
      *
      * @param source {@code Spliterator} describing the stream source
-     * @param sourceFlags The source flags for the stream source, described in {@link StreamOpFlag}
+     * @param sourceFlags The source flags for the stream source, described in
+     *        {@link StreamOpFlag}
+     * @param parallel True if the pipeline is parallel
      */
-    LongPipeline(Spliterator<Long> source, int sourceFlags) {
-        super(source, sourceFlags);
+    LongPipeline(Spliterator<Long> source,
+                 int sourceFlags, boolean parallel) {
+        super(source, sourceFlags, parallel);
     }
 
     /**
@@ -457,9 +463,11 @@
          *
          * @param source {@code Supplier<Spliterator>} describing the stream source
          * @param sourceFlags The source flags for the stream source, described in {@link StreamOpFlag}
+         * @param parallel True if the pipeline is parallel
          */
-        Head(Supplier<? extends Spliterator<Long>> source, int sourceFlags) {
-            super(source, sourceFlags);
+        Head(Supplier<? extends Spliterator<Long>> source,
+             int sourceFlags, boolean parallel) {
+            super(source, sourceFlags, parallel);
         }
 
         /**
@@ -467,9 +475,11 @@
          *
          * @param source {@code Spliterator} describing the stream source
          * @param sourceFlags The source flags for the stream source, described in {@link StreamOpFlag}
+         * @param parallel True if the pipeline is parallel
          */
-        Head(Spliterator<Long> source, int sourceFlags) {
-            super(source, sourceFlags);
+        Head(Spliterator<Long> source,
+             int sourceFlags, boolean parallel) {
+            super(source, sourceFlags, parallel);
         }
 
         @Override
--- a/src/share/classes/java/util/stream/ReferencePipeline.java	Thu Apr 04 22:28:40 2013 -0400
+++ b/src/share/classes/java/util/stream/ReferencePipeline.java	Fri Apr 05 11:25:17 2013 +0200
@@ -58,11 +58,12 @@
      *
      * @param source {@code Supplier<Spliterator>} describing the stream source
      * @param sourceFlags The source flags for the stream source, described in
-     * {@link StreamOpFlag}
+     *        {@link StreamOpFlag}
+     * @param parallel True if the pipeline is parallel
      */
     ReferencePipeline(Supplier<? extends Spliterator<?>> source,
-                      int sourceFlags) {
-        super(source, sourceFlags);
+                      int sourceFlags, boolean parallel) {
+        super(source, sourceFlags, parallel);
     }
 
     /**
@@ -70,10 +71,12 @@
      *
      * @param source {@code Spliterator} describing the stream source
      * @param sourceFlags The source flags for the stream source, described in
-     * {@link StreamOpFlag}
+     *        {@link StreamOpFlag}
+     * @param parallel True if the pipeline is parallel
      */
-    ReferencePipeline(Spliterator<?> source, int sourceFlags) {
-        super(source, sourceFlags);
+    ReferencePipeline(Spliterator<?> source,
+                      int sourceFlags, boolean parallel) {
+        super(source, sourceFlags, parallel);
     }
 
     /**
@@ -479,8 +482,9 @@
          * @param source {@code Supplier<Spliterator>} describing the stream source
          * @param sourceFlags The source flags for the stream source, described in {@link StreamOpFlag}
          */
-        Head(Supplier<? extends Spliterator<?>> source, int sourceFlags) {
-            super(source, sourceFlags);
+        Head(Supplier<? extends Spliterator<?>> source,
+             int sourceFlags, boolean parallel) {
+            super(source, sourceFlags, parallel);
         }
 
         /**
@@ -489,8 +493,9 @@
          * @param source {@code Spliterator} describing the stream source
          * @param sourceFlags The source flags for the stream source, described in {@link StreamOpFlag}
          */
-        Head(Spliterator<?> source, int sourceFlags) {
-            super(source, sourceFlags);
+        Head(Spliterator<?> source,
+             int sourceFlags, boolean parallel) {
+            super(source, sourceFlags, parallel);
         }
 
         @Override
--- a/src/share/classes/java/util/stream/StreamOpFlag.java	Thu Apr 04 22:28:40 2013 -0400
+++ b/src/share/classes/java/util/stream/StreamOpFlag.java	Fri Apr 05 11:25:17 2013 +0200
@@ -51,7 +51,6 @@
  *       <th>{@code ORDERED}</th>
  *       <th>{@code SIZED}</th>
  *       <th>{@code SHORT_CIRCUIT}</th>
- *       <th>{@code PARALLEL}</th>
  *     </tr>
  *   </thead>
  *   <tbody>
@@ -62,7 +61,6 @@
  *        <td>Y</td>
  *        <td>Y</td>
  *        <td>N</td>
- *        <td>Y</td>
  *      </tr>
  *      <tr>
  *        <th colspan="2" class="tableSubHeadingColor">Intermediate operation</th>
@@ -71,34 +69,32 @@
  *        <td>PCI</td>
  *        <td>PC</td>
  *        <td>PI</td>
- *        <td>PC</td>
  *      </tr>
-  *      <tr>
+ *      <tr>
  *        <th colspan="2" class="tableSubHeadingColor">Terminal operation</th>
  *        <td>N</td>
  *        <td>N</td>
  *        <td>PC</td>
  *        <td>N</td>
  *        <td>PI</td>
- *        <td>N</td>
  *      </tr>
-*   </tbody>
-*   <tfoot>
-*       <tr>
-*         <th class="tableSubHeadingColor" colspan="2">Legend</th>
-*         <th colspan="6" rowspan="7">&nbsp;</th>
-*       </tr>
-*       <tr>
-*         <th class="tableSubHeadingColor">Flag</th>
-*         <th class="tableSubHeadingColor">Meaning</th>
-*         <th colspan="6"></th>
-*       </tr>
-*       <tr><td>Y</td><td>Allowed</td></tr>
-*       <tr><td>N</td><td>Invalid</td></tr>
-*       <tr><td>P</td><td>Preserves</td></tr>
-*       <tr><td>C</td><td>Clears</td></tr>
-*       <tr><td>I</td><td>Injects</td></tr>
-*   </tfoot>
+ *   </tbody>
+ *   <tfoot>
+ *       <tr>
+ *         <th class="tableSubHeadingColor" colspan="2">Legend</th>
+ *         <th colspan="6" rowspan="7">&nbsp;</th>
+ *       </tr>
+ *       <tr>
+ *         <th class="tableSubHeadingColor">Flag</th>
+ *         <th class="tableSubHeadingColor">Meaning</th>
+ *         <th colspan="6"></th>
+ *       </tr>
+ *       <tr><td>Y</td><td>Allowed</td></tr>
+ *       <tr><td>N</td><td>Invalid</td></tr>
+ *       <tr><td>P</td><td>Preserves</td></tr>
+ *       <tr><td>C</td><td>Clears</td></tr>
+ *       <tr><td>I</td><td>Injects</td></tr>
+ *   </tfoot>
  * </table>
  * </div>
  *
@@ -134,7 +130,7 @@
  * correct order.
  *
  * <p>
- * With the exception of {@link #PARALLEL}, stream characteristics can be
+ * With the exception of {@link #SHORT_CIRCUIT}, stream characteristics can be
  * derived from the equivalent {@link java.util.Spliterator} characteristics:
  * {@link java.util.Spliterator#DISTINCT}, {@link java.util.Spliterator#SORTED},
  * {@link java.util.Spliterator#ORDERED}, and
@@ -222,12 +218,12 @@
      * Characteristics belong to certain types, see the Type enum. Bit masks for
      * the types are constructed as per the following table:
      *
-     *                        DISTINCT  SORTED  ORDERED  SIZED  SHORT_CIRCUIT  PARALLEL
-     *          SPLITERATOR      01       01       01      01        00           00
-     *               STREAM      01       01       01      01        00           01
-     *                   OP      11       11       11      10        01           10
-     *          TERMINAL_OP      00       00       10      00        01           00
-     * UPSTREAM_TERMINAL_OP      00       00       10      00        00           00
+     *                        DISTINCT  SORTED  ORDERED  SIZED  SHORT_CIRCUIT
+     *          SPLITERATOR      01       01       01      01        00
+     *               STREAM      01       01       01      01        00
+     *                   OP      11       11       11      10        01
+     *          TERMINAL_OP      00       00       10      00        01
+     * UPSTREAM_TERMINAL_OP      00       00       10      00        00
      *
      * 01 = set/inject
      * 10 = clear
@@ -332,23 +328,12 @@
      */
     // 12, 0x01000000
     SHORT_CIRCUIT(12,
-                  set(Type.OP).set(Type.TERMINAL_OP)),
-
-
-    /**
-     * Characteristic value signifying that the stream is to be evaluated in
-     * parallel rather than sequentially.
-     * <p>
-     * A stream can have this value or an intermediate operation can preserve or
-     * clear this value.
-     */
-    // 13, 0x04000000
-    PARALLEL(13,
-             set(Type.STREAM).clear(Type.OP));
+                  set(Type.OP).set(Type.TERMINAL_OP));
 
     // The following 2 flags are currently undefined and a free for any further
     // stream flags if/when required
     //
+    // 13, 0x04000000
     // 14, 0x10000000
     // 15, 0x40000000
 
@@ -622,16 +607,6 @@
      */
     static final int IS_SHORT_CIRCUIT = SHORT_CIRCUIT.set;
 
-    /**
-     * The bit value to set {@link #PARALLEL}
-     */
-    static final int IS_PARALLEL = PARALLEL.set;
-
-    /**
-     * The bit value to clear {@link #PARALLEL}
-     */
-    static final int NOT_PARALLEL = PARALLEL.clear;
-
     private static int getMask(int flags) {
         return (flags == 0)
                ? FLAG_MASK
--- a/src/share/classes/java/util/stream/Streams.java	Thu Apr 04 22:28:40 2013 -0400
+++ b/src/share/classes/java/util/stream/Streams.java	Fri Apr 05 11:25:17 2013 +0200
@@ -112,7 +112,8 @@
                                       int characteristics) {
         Objects.requireNonNull(supplier);
         return new ReferencePipeline.Head<>(supplier,
-                                            StreamOpFlag.fromCharacteristics(characteristics) & ~StreamOpFlag.IS_PARALLEL);
+                                            StreamOpFlag.fromCharacteristics(characteristics),
+                                            false);
     }
 
     /**
@@ -145,7 +146,8 @@
                                               int characteristics) {
         Objects.requireNonNull(supplier);
         return new ReferencePipeline.Head<>(supplier,
-                                            StreamOpFlag.fromCharacteristics(characteristics) | StreamOpFlag.IS_PARALLEL);
+                                            StreamOpFlag.fromCharacteristics(characteristics),
+                                            true);
     }
 
     /**
@@ -173,7 +175,8 @@
     public static<T> Stream<T> stream(Spliterator<T> spliterator) {
         Objects.requireNonNull(spliterator);
         return new ReferencePipeline.Head<>(spliterator,
-                                            StreamOpFlag.fromCharacteristics(spliterator) & ~StreamOpFlag.IS_PARALLEL);
+                                            StreamOpFlag.fromCharacteristics(spliterator),
+                                            false);
     }
 
     /**
@@ -201,7 +204,8 @@
     public static<T> Stream<T> parallelStream(Spliterator<T> spliterator) {
         Objects.requireNonNull(spliterator);
         return new ReferencePipeline.Head<>(spliterator,
-                                            StreamOpFlag.fromCharacteristics(spliterator) | StreamOpFlag.IS_PARALLEL);
+                                            StreamOpFlag.fromCharacteristics(spliterator),
+                                            true);
     }
 
     // IntStream construction
@@ -243,7 +247,8 @@
     public static IntStream intStream(Supplier<? extends Spliterator.OfInt> supplier,
                                       int characteristics) {
         return new IntPipeline.Head<>(supplier,
-                                      StreamOpFlag.fromCharacteristics(characteristics) & ~StreamOpFlag.IS_PARALLEL);
+                                      StreamOpFlag.fromCharacteristics(characteristics),
+                                      false);
     }
 
     /**
@@ -274,7 +279,8 @@
     public static IntStream intParallelStream(Supplier<? extends Spliterator.OfInt> supplier,
                                               int characteristics) {
         return new IntPipeline.Head<>(supplier,
-                                      StreamOpFlag.fromCharacteristics(characteristics) | StreamOpFlag.IS_PARALLEL);
+                                      StreamOpFlag.fromCharacteristics(characteristics),
+                                      true);
     }
 
     /**
@@ -300,7 +306,9 @@
      * @return A new sequential {@code IntStream}
      */
     public static IntStream intStream(Spliterator.OfInt spliterator) {
-        return new IntPipeline.Head<>(spliterator, spliterator.characteristics() & ~StreamOpFlag.IS_PARALLEL);
+        return new IntPipeline.Head<>(spliterator,
+                                      StreamOpFlag.fromCharacteristics(spliterator),
+                                      false);
     }
 
     /**
@@ -327,7 +335,8 @@
      */
     public static IntStream intParallelStream(Spliterator.OfInt spliterator) {
         return new IntPipeline.Head<>(spliterator,
-                                      StreamOpFlag.fromCharacteristics(spliterator.characteristics()) | StreamOpFlag.IS_PARALLEL);
+                                      StreamOpFlag.fromCharacteristics(spliterator),
+                                      true);
     }
 
     // LongStream construction
@@ -369,7 +378,8 @@
     public static LongStream longStream(Supplier<? extends Spliterator.OfLong> supplier,
                                         int characteristics) {
         return new LongPipeline.Head<>(supplier,
-                                       StreamOpFlag.fromCharacteristics(characteristics) & ~StreamOpFlag.IS_PARALLEL);
+                                       StreamOpFlag.fromCharacteristics(characteristics),
+                                       false);
     }
 
     /**
@@ -400,7 +410,8 @@
     public static LongStream longParallelStream(Supplier<? extends Spliterator.OfLong> supplier,
                                                 int characteristics) {
         return new LongPipeline.Head<>(supplier,
-                                       StreamOpFlag.fromCharacteristics(characteristics) | StreamOpFlag.IS_PARALLEL);
+                                       StreamOpFlag.fromCharacteristics(characteristics),
+                                       true);
     }
 
     /**
@@ -427,7 +438,8 @@
      */
     public static LongStream longStream(Spliterator.OfLong spliterator) {
         return new LongPipeline.Head<>(spliterator,
-                                       StreamOpFlag.fromCharacteristics(spliterator.characteristics()) & ~StreamOpFlag.IS_PARALLEL);
+                                       StreamOpFlag.fromCharacteristics(spliterator),
+                                       false);
     }
 
     /**
@@ -454,7 +466,8 @@
      */
     public static LongStream longParallelStream(Spliterator.OfLong spliterator) {
         return new LongPipeline.Head<>(spliterator,
-                                       StreamOpFlag.fromCharacteristics(spliterator.characteristics()) | StreamOpFlag.IS_PARALLEL);
+                                       StreamOpFlag.fromCharacteristics(spliterator),
+                                       true);
     }
 
     // DoubleStream construction
@@ -496,7 +509,8 @@
     public static DoubleStream doubleStream(Supplier<? extends Spliterator.OfDouble> supplier,
                                             int characteristics) {
         return new DoublePipeline.Head<>(supplier,
-                                         StreamOpFlag.fromCharacteristics(characteristics) & ~StreamOpFlag.IS_PARALLEL);
+                                         StreamOpFlag.fromCharacteristics(characteristics),
+                                         false);
     }
 
     /**
@@ -527,7 +541,8 @@
     public static DoubleStream doubleParallelStream(Supplier<? extends Spliterator.OfDouble> supplier,
                                                     int characteristics) {
         return new DoublePipeline.Head<>(supplier,
-                                         StreamOpFlag.fromCharacteristics(characteristics) | StreamOpFlag.IS_PARALLEL);
+                                         StreamOpFlag.fromCharacteristics(characteristics),
+                                         true);
     }
 
     /**
@@ -554,7 +569,8 @@
      */
     public static DoubleStream doubleStream(Spliterator.OfDouble spliterator) {
         return new DoublePipeline.Head<>(spliterator,
-                                         StreamOpFlag.fromCharacteristics(spliterator.characteristics()) & ~StreamOpFlag.IS_PARALLEL);
+                                         StreamOpFlag.fromCharacteristics(spliterator),
+                                         false);
     }
 
     /**
@@ -581,7 +597,8 @@
      */
     public static DoubleStream doubleParallelStream(Spliterator.OfDouble spliterator) {
         return new DoublePipeline.Head<>(spliterator,
-                                         StreamOpFlag.fromCharacteristics(spliterator.characteristics()) | StreamOpFlag.IS_PARALLEL);
+                                         StreamOpFlag.fromCharacteristics(spliterator),
+                                         true);
     }
 
     // Infinite Stream generators
--- a/test-ng/bootlib/java/util/stream/CollectorOps.java	Thu Apr 04 22:28:40 2013 -0400
+++ b/test-ng/bootlib/java/util/stream/CollectorOps.java	Fri Apr 05 11:25:17 2013 +0200
@@ -32,15 +32,11 @@
 public final class CollectorOps {
     private CollectorOps() { }
 
-    public static <E_IN> StatefulOp<E_IN> sequentialCollector() {
-        return new StatefulCollector<>(StreamOpFlag.NOT_PARALLEL, StreamShape.REFERENCE);
-    }
-
-    public static <E_IN> StatefulOp<E_IN> parallelCollector() {
+    public static <E_IN> StatefulTestOp<E_IN> collector() {
         return new StatefulCollector<>(0, StreamShape.REFERENCE);
     }
 
-    public static class StatefulCollector<E_IN> implements StatefulOp<E_IN> {
+    public static class StatefulCollector<E_IN> implements StatefulTestOp<E_IN> {
         private final int opFlags;
         private final StreamShape inputShape;
 
@@ -65,7 +61,7 @@
         }
 
         @Override
-        public Sink<E_IN> opWrapSink(int flags, Sink<E_IN> sink) {
+        public Sink<E_IN> opWrapSink(int flags, boolean parallel, Sink<E_IN> sink) {
             return sink;
         }
 
--- a/test-ng/bootlib/java/util/stream/FlagDeclaringOp.java	Thu Apr 04 22:28:40 2013 -0400
+++ b/test-ng/bootlib/java/util/stream/FlagDeclaringOp.java	Fri Apr 05 11:25:17 2013 +0200
@@ -29,7 +29,7 @@
  *
  */
 @SuppressWarnings({"rawtypes", "unchecked"})
-public class FlagDeclaringOp<T> implements IntermediateOp<T, T> {
+public class FlagDeclaringOp<T> implements StatelessTestOp<T, T> {
     private final int flags;
     private final StreamShape shape;
 
@@ -58,7 +58,7 @@
     }
 
     @Override
-    public Sink<T> opWrapSink(int flags, Sink sink) {
+    public Sink<T> opWrapSink(int flags, boolean parallel, Sink sink) {
         return sink;
     }
 }
--- a/test-ng/bootlib/java/util/stream/IntermediateOp.java	Thu Apr 04 22:28:40 2013 -0400
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,235 +0,0 @@
-/*
- * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.  Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-package java.util.stream;
-
-import java.util.Spliterator;
-import java.util.function.IntFunction;
-
-/**
- * An operation in a stream pipeline that takes a stream as input and produces
- * a stream, possibly of a different type, as output.  An intermediate operation
- * has an input type and an output type, reflected in its type parameters
- * {@code E_IN} and {@code E_OUT}, and, an associated input shape and
- * output shape.  An intermediate operation also has a set of <em>operation
- * flags</em> that describes how it transforms characteristics of the stream
- * (such as sortedness or size; see {@link StreamOpFlag}).
- *
- * <p>Intermediate operations are implemented in terms of <em>sink transforms
- * </em>; given a {@code Sink} for the output type of the operation, produce a
- * {@code Sink} for the input type of the operation, which, when fed with
- * values, has the effect of implementing the desired operation on the input
- * values and feeding them to the output sink.
- *
- * <p>Some intermediate operations are <em>stateful</em>.  This means that the
- * sinks they produce as a result of the above wrapping may maintain state from
- * processing earlier elements.  Stateful intermediate operations must implement
- * the {@link StatefulOp} interface.  Statefulness has an effect on how the
- * operation can be parallelized.  Stateless operations parallelize trivially
- * because they are homomorphisms under concatenation:
- *
- * <pre>
- *     statelessOp(a || b) = statelessOp(a) || statelessOp(b)
- * </pre>
- *
- * where {@code ||} denotes concatenation.  Stateful operations may still be
- * parallelizable, but are not amenable to the automatic parallelization of
- * stateless operations.  Accordingly, a stateful operation must provide its own
- * parallel execution implementation
- * ({@link IntermediateOp#opEvaluateParallel(PipelineHelper, java.util.Spliterator, java.util.function.IntFunction)}).
- *
- * @apiNote
- * As an example, consider the stream pipeline:
- * <pre>
- *     int oldestBob = people.stream()
- *                            .filter(p -> p.getFirstName.equals("Bob"))
- *                            .mapToInt(p -> p.getAge())
- *                            .max();
- * </pre>
- *
- * <p>This pipeline has two intermediate operations, filter and map.  The
- * filtering operation has input and output types of {@code Person} (with input
- * and output shape of {@code REFERENCE}), and the mapping operation has an
- * input type of {@code Person} and an output type of {@code Integer} (with
- * shape {@code INT_VALUE}.)  When we construct a sink chain, the mapping
- * operation will be asked to transform a {@code Sink.OfInt} which computes the
- * maximum value into a {@code Sink} which accepts {@code Person} objects, and
- * whose behavior is to take the supplied {@code Person}, call {@code getAge()}
- * on it, and pass the resulting value to the downstream sink.  This sink
- * transform might be implement as:
- *
- * <pre>
- *     new Sink.ChainedReference<U>(sink) {
- *         public void accept(U u) {
- *             downstream.accept(mappingFunction.applyAsInt(u));
- *         }
- *     }
- * </pre>
- *
- * @param <E_IN>  Type of input elements to the operation
- * @param <E_OUT> Type of output elements to the operation
- * @see TerminalOp
- * @see StatefulOp
- * @since 1.8
- */
-interface IntermediateOp<E_IN, E_OUT> {
-
-    @SuppressWarnings({"rawtypes", "unchecked"})
-    public static<T> AbstractPipeline chain(AbstractPipeline upstream,
-                                            IntermediateOp<?, T> op) {
-        if (op instanceof StatefulOp)
-            return StatefulOp.chain(upstream, (StatefulOp) op);
-        switch (op.outputShape()) {
-            case REFERENCE:
-                return new ReferencePipeline.StatelessOp<Object, T>(upstream, op.inputShape(), op.opGetFlags()) {
-                    public Sink opWrapSink(int flags, Sink<T> sink) {
-                        return op.opWrapSink(flags, sink);
-                    }
-                };
-            case INT_VALUE:
-                return new IntPipeline.StatelessOp<Object>(upstream, op.inputShape(), op.opGetFlags()) {
-                    public Sink opWrapSink(int flags, Sink sink) {
-                        return op.opWrapSink(flags, sink);
-                    }
-                };
-            case LONG_VALUE:
-                return new LongPipeline.StatelessOp<Object>(upstream, op.inputShape(), op.opGetFlags()) {
-                    @Override
-                    Sink opWrapSink(int flags, Sink sink) {
-                        return op.opWrapSink(flags, sink);
-                    }
-                };
-            case DOUBLE_VALUE:
-                return new DoublePipeline.StatelessOp<Object>(upstream, op.inputShape(), op.opGetFlags()) {
-                    @Override
-                    Sink opWrapSink(int flags, Sink sink) {
-                        return op.opWrapSink(flags, sink);
-                    }
-                };
-            default: throw new IllegalStateException(op.outputShape().toString());
-        }
-    }
-
-
-    /**
-     * Gets the shape of the input type of this operation
-     *
-     * @implSpec The default returns {@code StreamShape.REFERENCE}
-     * @return Shape of the input type of this operation
-     */
-    default StreamShape inputShape() { return StreamShape.REFERENCE; }
-
-    /**
-     * Gets the shape of the output type of this operation
-     *
-     * @implSpec The default returns {@code StreamShape.REFERENCE}
-     * @return Shape of the output type of this operation
-     */
-    default StreamShape outputShape() { return StreamShape.REFERENCE; }
-
-    /**
-     * Gets the operation flags of this operation.
-     *
-     * @implSpec The default returns {@code 0}
-     * @return a bitmap describing the operation flags of this operation
-     * @see StreamOpFlag
-     */
-    default int opGetFlags() { return 0; }
-
-    /**
-     * Returns whether this operation is stateful or not.  If it is stateful,
-     * then the method
-     * {@link #opEvaluateParallel(PipelineHelper, java.util.Spliterator, java.util.function.IntFunction)}
-     * must be overridden.
-     *
-     * @implSpec The default implementation returns {@code false}.
-     * @return {@code true} if this operation is stateful
-     */
-    default boolean opIsStateful() { return false; }
-
-    /**
-     * Accepts a {@code Sink} which will receive the results of this operation,
-     * and return a {@code Sink} which accepts elements of the input type of
-     * this operation and which performs the operation, passing the results to
-     * the provided {@code Sink}.
-     *
-     * <p>The implementation may use the {@code flags} parameter to optimize the
-     * sink wrapping.  For example, if the input is already {@code DISTINCT},
-     * the implementation for the {@code Stream#distinct()} method could just
-     * return the sink it was passed.
-     *
-     * @param flags The combined stream and operation flags up to, but not
-     *        including, this operation.
-     * @param sink elements will be sent to this sink after the processing.
-     * @return a sink which will accept elements and perform the operation upon
-     *         each element, passing the results (if any) to the provided
-     *         {@code Sink}.
-     */
-    Sink<E_IN> opWrapSink(int flags, Sink<E_OUT> sink);
-
-    /**
-     * Performs a parallel evaluation of the operation using the specified
-     * {@code PipelineHelper} which describes the stream source and upstream
-     * intermediate operations.  Only called on stateful operations.  If
-     * {@link #opIsStateful()} returns true then implementations must override the
-     * default implementation.
-     *
-     * @implSpec The default implementation throws an
-     * {@link UnsupportedOperationException}
-     *
-     * @param helper the pipeline helper
-     * @param spliterator the source {@code Spliterator}
-     * @param generator the array generator
-     * @return a {@code Node} describing the result of the evaluation
-     */
-    default <P_IN> Node<E_OUT> opEvaluateParallel(PipelineHelper<E_OUT> helper,
-                                                  Spliterator<P_IN> spliterator,
-                                                  IntFunction<E_OUT[]> generator) {
-        throw new UnsupportedOperationException("Parallel evaluation is not supported");
-    }
-
-    /**
-     * Returns a {@code Spliterator} describing a parallel evaluation of the operation using
-     * the specified {@code PipelineHelper} which describes the stream source and upstream
-     * intermediate operations.  Only called on stateful operations.  It is not necessary
-     * (though acceptable) to do a full computation of the result here; it is preferable, if
-     * possible, to describe the result via a lazily evaluated spliterator.
-     *
-     * @implSpec The default implementation behaves as if:
-     * <pre>{@code
-     *     return evaluateParallel(helper, i -> (E_OUT[]) new Object[i]).spliterator();
-     * }</pre>
-     * and is suitable for implementations that cannot do better than a full synchronous
-     * evaluation.
-     *
-     * @param helper the pipeline helper
-     * @param spliterator the source {@code Spliterator}
-     * @return a {@code Spliterator} describing the result of the evaluation
-     */
-    @SuppressWarnings("unchecked")
-    default <P_IN> Spliterator<E_OUT> opEvaluateParallelLazy(PipelineHelper<E_OUT> helper,
-                                                             Spliterator<P_IN> spliterator) {
-        return opEvaluateParallel(helper, spliterator, i -> (E_OUT[]) new Object[i]).spliterator();
-    }
-}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test-ng/bootlib/java/util/stream/IntermediateTestOp.java	Fri Apr 05 11:25:17 2013 +0200
@@ -0,0 +1,43 @@
+/*
+ * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package java.util.stream;
+
+/**
+ * A base type for test operations
+ */
+interface IntermediateTestOp<E_IN, E_OUT> {
+
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    public static<T> AbstractPipeline chain(AbstractPipeline upstream,
+                                            IntermediateTestOp<?, T> op) {
+        if (op instanceof StatelessTestOp)
+            return StatelessTestOp.chain(upstream, (StatelessTestOp) op);
+
+        if (op instanceof StatefulTestOp)
+            return StatefulTestOp.chain(upstream, (StatefulTestOp) op);
+
+        throw new IllegalStateException("Unknown test op type: " + op.getClass().getName());
+    }
+}
--- a/test-ng/bootlib/java/util/stream/LambdaTestHelpers.java	Thu Apr 04 22:28:40 2013 -0400
+++ b/test-ng/bootlib/java/util/stream/LambdaTestHelpers.java	Fri Apr 05 11:25:17 2013 +0200
@@ -449,7 +449,6 @@
         if (StreamOpFlag.SIZED.isKnown(flags)) sj.add("IS_SIZED");
         if (StreamOpFlag.SORTED.isKnown(flags)) sj.add("IS_SORTED");
         if (StreamOpFlag.SHORT_CIRCUIT.isKnown(flags)) sj.add("IS_SHORT_CIRCUIT");
-        if (StreamOpFlag.PARALLEL.isKnown(flags)) sj.add("IS_PARALLEL");
         return sj.toString();
     }
 }
--- a/test-ng/bootlib/java/util/stream/OpTestCase.java	Thu Apr 04 22:28:40 2013 -0400
+++ b/test-ng/bootlib/java/util/stream/OpTestCase.java	Fri Apr 05 11:25:17 2013 +0200
@@ -118,7 +118,7 @@
             this.data = Objects.requireNonNull(data);
         }
 
-        public <U, S_OUT extends BaseStream<U, S_OUT>> ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> ops(IntermediateOp... ops) {
+        public <U, S_OUT extends BaseStream<U, S_OUT>> ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> ops(IntermediateTestOp... ops) {
             return new ExerciseDataStreamBuilder<>(data, (S_IN s) -> (S_OUT) chain(s, ops));
         }
 
@@ -128,7 +128,7 @@
         }
 
         public <U, S_OUT extends BaseStream<U, S_OUT>> ExerciseDataStreamBuilder<T, U, S_IN, S_OUT>
-        stream(Function<S_IN, S_OUT> m, IntermediateOp<U, U> additionalOp) {
+        stream(Function<S_IN, S_OUT> m, IntermediateTestOp<U, U> additionalOp) {
             return new ExerciseDataStreamBuilder<>(data, s -> (S_OUT) chain(m.apply(s), additionalOp));
         }
 
@@ -332,10 +332,10 @@
 
     static enum TerminalTestScenario {
         SINGLE_SEQUENTIAL,
-        SINGLE_SEQUENTIAL_PULL,
+        SINGLE_SEQUENTIAL_SHORT_CIRCUIT,
         SINGLE_PARALLEL,
         ALL_SEQUENTIAL,
-        ALL_SEQUENTIAL_PULL,
+        ALL_SEQUENTIAL_SHORT_CIRCUIT,
         ALL_PARALLEL,
         ALL_PARALLEL_SEQUENTIAL,
     }
@@ -419,21 +419,24 @@
             if (refResult == null) {
                 // Sequentially collect the output that will be input to the terminal op
                 refResult = terminalF.apply((S_OUT) createPipeline(shape, node.spliterator(),
-                                                                   StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SIZED));
+                                                                   StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SIZED,
+                                                                   false));
             } else if (testSet.contains(TerminalTestScenario.SINGLE_SEQUENTIAL)) {
                 S_OUT source = (S_OUT) createPipeline(shape, node.spliterator(),
-                                                      StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SIZED);
+                                                      StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SIZED,
+                                                      false);
                 BiConsumer<R, R> asserter = sequentialEqualityAsserter.apply(source);
                 R result = terminalF.apply(source);
                 LambdaTestHelpers.launderAssertion(() -> asserter.accept(refResult, result),
                                                    () -> String.format("Single sequential: %s != %s", refResult, result));
             }
 
-            if (testSet.contains(TerminalTestScenario.SINGLE_SEQUENTIAL_PULL)) {
+            if (testSet.contains(TerminalTestScenario.SINGLE_SEQUENTIAL_SHORT_CIRCUIT)) {
                 S_OUT source = (S_OUT) createPipeline(shape, node.spliterator(),
-                                                      StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SIZED);
-                // Force pull mode
-                source = (S_OUT) chain(source, new PullOnlyOp<U>(shape));
+                                                      StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SIZED,
+                                                      false);
+                // Force short-curcuit
+                source = (S_OUT) chain(source, new ShortCircuitOp<U>(shape));
                 BiConsumer<R, R> asserter = sequentialEqualityAsserter.apply(source);
                 R result = terminalF.apply(source);
                 LambdaTestHelpers.launderAssertion(() -> asserter.accept(refResult, result),
@@ -442,7 +445,8 @@
 
             if (testSet.contains(TerminalTestScenario.SINGLE_PARALLEL)) {
                 S_OUT source = (S_OUT) createPipeline(shape, node.spliterator(),
-                                                      StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SIZED | StreamOpFlag.IS_PARALLEL);
+                                                      StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SIZED,
+                                                      true);
                 BiConsumer<R, R> asserter = parallelEqualityAsserter.apply(source);
                 R result = terminalF.apply(source);
                 LambdaTestHelpers.launderAssertion(() -> asserter.accept(refResult, result),
@@ -450,8 +454,7 @@
             }
 
             if (testSet.contains(TerminalTestScenario.ALL_SEQUENTIAL)) {
-                // This may push or pull depending on the terminal op implementation
-
+                // This may forEach or tryAdvance depending on the terminal op implementation
                 S_OUT source = streamF.apply(data.stream());
                 BiConsumer<R, R> asserter = sequentialEqualityAsserter.apply(source);
                 R result = terminalF.apply(source);
@@ -459,10 +462,10 @@
                                                    () -> String.format("All sequential: %s != %s", refResult, result));
             }
 
-            if (testSet.contains(TerminalTestScenario.ALL_SEQUENTIAL_PULL)) {
+            if (testSet.contains(TerminalTestScenario.ALL_SEQUENTIAL_SHORT_CIRCUIT)) {
                 S_OUT source = streamF.apply(data.stream());
-                // Force pull mode
-                source = (S_OUT) chain(source, new PullOnlyOp<U>(shape));
+                // Force short-curcuit
+                source = (S_OUT) chain(source, new ShortCircuitOp<U>(shape));
                 BiConsumer<R, R> asserter = sequentialEqualityAsserter.apply(source);
                 R result = terminalF.apply(source);
                 LambdaTestHelpers.launderAssertion(() -> asserter.accept(refResult, result),
@@ -488,12 +491,12 @@
             return refResult;
         }
 
-        AbstractPipeline createPipeline(StreamShape shape, Spliterator s, int flags) {
+        AbstractPipeline createPipeline(StreamShape shape, Spliterator s, int flags, boolean parallel) {
             switch (shape) {
-                case REFERENCE:    return new ReferencePipeline.Head<>(s, flags);
-                case INT_VALUE:    return new IntPipeline.Head(s, flags);
-                case LONG_VALUE:   return new LongPipeline.Head(s, flags);
-                case DOUBLE_VALUE: return new DoublePipeline.Head(s, flags);
+                case REFERENCE:    return new ReferencePipeline.Head<>(s, flags, parallel);
+                case INT_VALUE:    return new IntPipeline.Head(s, flags, parallel);
+                case LONG_VALUE:   return new LongPipeline.Head(s, flags, parallel);
+                case DOUBLE_VALUE: return new DoublePipeline.Head(s, flags, parallel);
                 default: throw new IllegalStateException("Unknown shape: " + shape);
             }
         }
@@ -521,24 +524,24 @@
     //
 
     @SuppressWarnings({"rawtypes", "unchecked"})
-    private static <T> AbstractPipeline<?, T, ?> chain(AbstractPipeline upstream, IntermediateOp<?, T> op) {
-        return (AbstractPipeline<?, T, ?>) IntermediateOp.chain(upstream, op);
+    private static <T> AbstractPipeline<?, T, ?> chain(AbstractPipeline upstream, IntermediateTestOp<?, T> op) {
+        return (AbstractPipeline<?, T, ?>) IntermediateTestOp.chain(upstream, op);
     }
 
     @SuppressWarnings({"rawtypes", "unchecked"})
-    private static AbstractPipeline<?, ?, ?> chain(AbstractPipeline pipe, IntermediateOp... ops) {
-        for (IntermediateOp op : ops)
+    private static AbstractPipeline<?, ?, ?> chain(AbstractPipeline pipe, IntermediateTestOp... ops) {
+        for (IntermediateTestOp op : ops)
             pipe = chain(pipe, op);
         return pipe;
     }
 
     @SuppressWarnings("rawtypes")
-    private static <T> AbstractPipeline<?, T, ?> chain(BaseStream pipe, IntermediateOp<?, T> op) {
+    private static <T> AbstractPipeline<?, T, ?> chain(BaseStream pipe, IntermediateTestOp<?, T> op) {
         return chain((AbstractPipeline) pipe, op);
     }
 
     @SuppressWarnings("rawtypes")
-    public static AbstractPipeline<?, ?, ?> chain(BaseStream pipe, IntermediateOp... ops) {
+    public static AbstractPipeline<?, ?, ?> chain(BaseStream pipe, IntermediateTestOp... ops) {
         return chain((AbstractPipeline) pipe, ops);
     }
 
@@ -569,15 +572,15 @@
         S parallelStream();
     }
 
-    private class PullOnlyOp<T> implements IntermediateOp<T,T> {
+    private class ShortCircuitOp<T> implements StatelessTestOp<T,T> {
         private final StreamShape shape;
 
-        private PullOnlyOp(StreamShape shape) {
+        private ShortCircuitOp(StreamShape shape) {
             this.shape = shape;
         }
 
         @Override
-        public Sink<T> opWrapSink(int flags, Sink<T> sink) {
+        public Sink<T> opWrapSink(int flags, boolean parallel, Sink<T> sink) {
             return sink;
         }
 
--- a/test-ng/bootlib/java/util/stream/StatefulOp.java	Thu Apr 04 22:28:40 2013 -0400
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,125 +0,0 @@
-/*
- * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.  Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-package java.util.stream;
-
-import java.util.Spliterator;
-import java.util.function.IntFunction;
-
-/**
- * A stateful intermediate stream operation ({@link IntermediateOp}).
- * <em>Stateful</em> means that state is accumulated as elements are processed.
- * Examples of stateful operations are sorting, extracting a subsequence of the
- * input, or removing duplicates.  Statefulness has an effect on how the
- * operation can be parallelized.  Stateless operations parallelize trivially
- * because they are homomorphisms under concatenation:
- *
- * <pre>
- *     statelessOp(a || b) = statelessOp(a) || statelessOp(b)
- * </pre>
- *
- * where {@code ||} denotes concatenation.  Stateful operations may still be
- * parallelizable, but are not amenable to the automatic parallelization of
- * stateless operations.  Accordingly, a stateful operation must provide its
- * own parallel execution implementation
- * ({@link IntermediateOp#opEvaluateParallel(PipelineHelper, java.util.Spliterator, java.util.function.IntFunction)})
- * as well as {@link IntermediateOp#opWrapSink(int, Sink)}.
- *
- * @param <E> Type of input and output elements.
- *
- * @see IntermediateOp
- * @see TerminalOp
- * @since 1.8
- */
-interface StatefulOp<E> extends IntermediateOp<E, E> {
-
-    @SuppressWarnings({"rawtypes", "unchecked"})
-    public static<T> AbstractPipeline chain(AbstractPipeline upstream,
-                                            StatefulOp op) {
-        switch (op.outputShape()) {
-            case REFERENCE:
-                return new ReferencePipeline.StatefulOp<Object, T>(upstream, op.inputShape(), op.opGetFlags()) {
-                    @Override
-                    Sink opWrapSink(int flags, Sink sink) {
-                        return op.opWrapSink(flags, sink);
-                    }
-
-                    @Override
-                    <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper, Spliterator<P_IN> spliterator, IntFunction<T[]> generator) {
-                        return op.opEvaluateParallel(helper, spliterator, generator);
-                    }
-                };
-            case INT_VALUE:
-                return new IntPipeline.StatefulOp<Object>(upstream, op.inputShape(), op.opGetFlags()) {
-                    @Override
-                    Sink opWrapSink(int flags, Sink sink) {
-                        return op.opWrapSink(flags, sink);
-                    }
-
-                    @Override
-                    <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper, Spliterator<P_IN> spliterator, IntFunction<Integer[]> generator) {
-                        return (Node<Integer>) op.opEvaluateParallel((PipelineHelper<T>) helper, spliterator, null);
-                    }
-                };
-            case LONG_VALUE:
-                return new LongPipeline.StatefulOp<Object>(upstream, op.inputShape(), op.opGetFlags()) {
-                    @Override
-                    Sink opWrapSink(int flags, Sink sink) {
-                        return op.opWrapSink(flags, sink);
-                    }
-
-                    @Override
-                    <P_IN> Node<Long> opEvaluateParallel(PipelineHelper<Long> helper, Spliterator<P_IN> spliterator, IntFunction<Long[]> generator) {
-                        return (Node<Long>) op.opEvaluateParallel((PipelineHelper<T>) helper, spliterator, null);
-                    }
-                };
-            case DOUBLE_VALUE:
-                return new DoublePipeline.StatefulOp<Object>(upstream, op.inputShape(), op.opGetFlags()) {
-                    @Override
-                    Sink opWrapSink(int flags, Sink sink) {
-                        return op.opWrapSink(flags, sink);
-                    }
-
-                    @Override
-                    <P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper, Spliterator<P_IN> spliterator, IntFunction<Double[]> generator) {
-                        return (Node<Double>) op.opEvaluateParallel((PipelineHelper<T>) helper, spliterator, null);
-                    }
-                };
-            default: throw new IllegalStateException(op.outputShape().toString());
-        }
-    }
-
-    /**
-     * Returns {@code true}.  Any overriding implementations must also return
-     * {@code true}
-     * @implSpec The default implementation returns {@code true}
-     * @return {@code true}
-     */
-    @Override
-    default boolean opIsStateful() {
-        return true;
-    }
-
-    <P_IN> Node<E> opEvaluateParallel(PipelineHelper<E> helper, Spliterator<P_IN> spliterator, IntFunction<E[]> generator);
-}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test-ng/bootlib/java/util/stream/StatefulTestOp.java	Fri Apr 05 11:25:17 2013 +0200
@@ -0,0 +1,140 @@
+/*
+ * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package java.util.stream;
+
+import java.util.Spliterator;
+import java.util.function.IntFunction;
+
+/**
+ * The base type for a stateful test operation.
+ */
+interface StatefulTestOp<E> extends IntermediateTestOp<E, E> {
+
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    public static<T> AbstractPipeline chain(AbstractPipeline upstream,
+                                            StatefulTestOp op) {
+        switch (op.outputShape()) {
+            case REFERENCE:
+                return new ReferencePipeline.StatefulOp<Object, T>(upstream, op.inputShape(), op.opGetFlags()) {
+                    @Override
+                    Sink opWrapSink(int flags, Sink sink) {
+                        return op.opWrapSink(flags, isParallel(), sink);
+                    }
+
+                    @Override
+                    <P_IN> Spliterator<T> opEvaluateParallelLazy(PipelineHelper<T> helper,
+                                                                 Spliterator<P_IN> spliterator) {
+                        return op.opEvaluateParallelLazy(helper, spliterator);
+                    }
+
+                    @Override
+                    <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,
+                                                      Spliterator<P_IN> spliterator,
+                                                      IntFunction<T[]> generator) {
+                        return op.opEvaluateParallel(helper, spliterator, generator);
+                    }
+                };
+            case INT_VALUE:
+                return new IntPipeline.StatefulOp<Object>(upstream, op.inputShape(), op.opGetFlags()) {
+                    @Override
+                    Sink opWrapSink(int flags, Sink sink) {
+                        return op.opWrapSink(flags, isParallel(), sink);
+                    }
+
+                    @Override
+                    <P_IN> Spliterator<Integer> opEvaluateParallelLazy(PipelineHelper<Integer> helper,
+                                                                 Spliterator<P_IN> spliterator) {
+                        return op.opEvaluateParallelLazy(helper, spliterator);
+                    }
+
+                    @Override
+                    <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper,
+                                                            Spliterator<P_IN> spliterator,
+                                                            IntFunction<Integer[]> generator) {
+                        return (Node<Integer>) op.opEvaluateParallel(helper, spliterator, generator);
+                    }
+                };
+            case LONG_VALUE:
+                return new LongPipeline.StatefulOp<Object>(upstream, op.inputShape(), op.opGetFlags()) {
+                    @Override
+                    Sink opWrapSink(int flags, Sink sink) {
+                        return op.opWrapSink(flags, isParallel(), sink);
+                    }
+
+                    @Override
+                    <P_IN> Spliterator<Long> opEvaluateParallelLazy(PipelineHelper<Long> helper,
+                                                                 Spliterator<P_IN> spliterator) {
+                        return op.opEvaluateParallelLazy(helper, spliterator);
+                    }
+
+                    @Override
+                    <P_IN> Node<Long> opEvaluateParallel(PipelineHelper<Long> helper,
+                                                         Spliterator<P_IN> spliterator,
+                                                         IntFunction<Long[]> generator) {
+                        return (Node<Long>) op.opEvaluateParallel(helper, spliterator, generator);
+                    }
+                };
+            case DOUBLE_VALUE:
+                return new DoublePipeline.StatefulOp<Object>(upstream, op.inputShape(), op.opGetFlags()) {
+                    @Override
+                    Sink opWrapSink(int flags, Sink sink) {
+                        return op.opWrapSink(flags, isParallel(), sink);
+                    }
+
+                    @Override
+                    <P_IN> Spliterator<Double> opEvaluateParallelLazy(PipelineHelper<Double> helper,
+                                                                    Spliterator<P_IN> spliterator) {
+                        return op.opEvaluateParallelLazy(helper, spliterator);
+                    }
+
+                    @Override
+                    <P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper,
+                                                           Spliterator<P_IN> spliterator,
+                                                           IntFunction<Double[]> generator) {
+                        return (Node<Double>) op.opEvaluateParallel(helper, spliterator, generator);
+                    }
+                };
+            default: throw new IllegalStateException(op.outputShape().toString());
+        }
+    }
+
+    default StreamShape inputShape() { return StreamShape.REFERENCE; }
+
+    default StreamShape outputShape() { return StreamShape.REFERENCE; }
+
+    default int opGetFlags() { return 0; }
+
+    Sink<E> opWrapSink(int flags, boolean parallel, Sink<E> sink);
+
+    @SuppressWarnings("unchecked")
+    default <P_IN> Spliterator<E> opEvaluateParallelLazy(PipelineHelper<E> helper,
+                                                         Spliterator<P_IN> spliterator) {
+        return opEvaluateParallel(helper, spliterator, i -> (E[]) new Object[i]).spliterator();
+    }
+
+    <P_IN> Node<E> opEvaluateParallel(PipelineHelper<E> helper,
+                                      Spliterator<P_IN> spliterator,
+                                      IntFunction<E[]> generator);
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test-ng/bootlib/java/util/stream/StatelessTestOp.java	Fri Apr 05 11:25:17 2013 +0200
@@ -0,0 +1,74 @@
+/*
+ * Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package java.util.stream;
+
+/**
+ * The base type of a stateless test operation
+ */
+interface StatelessTestOp<E_IN, E_OUT> extends IntermediateTestOp<E_IN, E_OUT> {
+
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    public static<T> AbstractPipeline chain(AbstractPipeline upstream,
+                                            StatelessTestOp<?, T> op) {
+        switch (op.outputShape()) {
+            case REFERENCE:
+                return new ReferencePipeline.StatelessOp<Object, T>(upstream, op.inputShape(), op.opGetFlags()) {
+                    public Sink opWrapSink(int flags, Sink<T> sink) {
+                        return op.opWrapSink(flags, isParallel(), sink);
+                    }
+                };
+            case INT_VALUE:
+                return new IntPipeline.StatelessOp<Object>(upstream, op.inputShape(), op.opGetFlags()) {
+                    public Sink opWrapSink(int flags, Sink sink) {
+                        return op.opWrapSink(flags, isParallel(), sink);
+                    }
+                };
+            case LONG_VALUE:
+                return new LongPipeline.StatelessOp<Object>(upstream, op.inputShape(), op.opGetFlags()) {
+                    @Override
+                    Sink opWrapSink(int flags, Sink sink) {
+                        return op.opWrapSink(flags, isParallel(), sink);
+                    }
+                };
+            case DOUBLE_VALUE:
+                return new DoublePipeline.StatelessOp<Object>(upstream, op.inputShape(), op.opGetFlags()) {
+                    @Override
+                    Sink opWrapSink(int flags, Sink sink) {
+                        return op.opWrapSink(flags, isParallel(), sink);
+                    }
+                };
+            default: throw new IllegalStateException(op.outputShape().toString());
+        }
+    }
+
+    default StreamShape inputShape() { return StreamShape.REFERENCE; }
+
+    default StreamShape outputShape() { return StreamShape.REFERENCE; }
+
+    default int opGetFlags() { return 0; }
+
+    Sink<E_IN> opWrapSink(int flags, boolean parallel, Sink<E_OUT> sink);
+}
+
--- a/test-ng/bootlib/java/util/stream/TestFlagExpectedOp.java	Thu Apr 04 22:28:40 2013 -0400
+++ b/test-ng/bootlib/java/util/stream/TestFlagExpectedOp.java	Fri Apr 05 11:25:17 2013 +0200
@@ -100,7 +100,7 @@
 
     @Override
     @SuppressWarnings({"rawtypes", "unchecked"})
-    public Sink<T> opWrapSink(int flags, Sink upstream) {
+    public Sink<T> opWrapSink(int flags, boolean parallel, Sink upstream) {
         assertFlags(flags);
         return upstream;
     }
--- a/test-ng/boottests/java/util/stream/FlagOpTest.java	Thu Apr 04 22:28:40 2013 -0400
+++ b/test-ng/boottests/java/util/stream/FlagOpTest.java	Fri Apr 05 11:25:17 2013 +0200
@@ -70,7 +70,7 @@
 
         @Override
         @SuppressWarnings({"unchecked", "rawtypes"})
-        public Sink<T> opWrapSink(int flags, Sink sink) {
+        public Sink<T> opWrapSink(int flags, boolean parallel, Sink sink) {
             this.wrapFlags = flags;
 
             if (downstream != null) {
@@ -158,19 +158,15 @@
                 exercise();
     }
 
-    public void testFlagsSetParallelCollect() {
-        testFlagsSetSequence(CollectorOps::parallelCollector);
+    public void testFlagsParallelCollect() {
+        testFlagsSetSequence(CollectorOps::collector);
     }
 
-    public void testFlagsSetSequentialCollect() {
-        testFlagsSetSequence(() -> CollectorOps.sequentialCollector());
-    }
-
-    private void testFlagsSetSequence(Supplier<StatefulOp<Integer>> cf) {
+    private void testFlagsSetSequence(Supplier<StatefulTestOp<Integer>> cf) {
         EnumSet<StreamOpFlag> known = EnumSet.of(StreamOpFlag.ORDERED, StreamOpFlag.SIZED);
         EnumSet<StreamOpFlag> preserve = EnumSet.of(StreamOpFlag.DISTINCT, StreamOpFlag.SORTED);
 
-        List<IntermediateOp<Integer, Integer>> ops = new ArrayList<>();
+        List<IntermediateTestOp<Integer, Integer>> ops = new ArrayList<>();
         for (StreamOpFlag f : EnumSet.of(StreamOpFlag.DISTINCT, StreamOpFlag.SORTED)) {
             ops.add(cf.get());
             ops.add(new TestFlagExpectedOp<>(f.set(),
@@ -188,7 +184,7 @@
 
         StreamTestData<Integer> data = new StreamTestData.ArrayData<>("Array", countTo(10).toArray(new Integer[0]));
         @SuppressWarnings("rawtypes")
-        IntermediateOp[] opsArray = ops.toArray(new IntermediateOp[ops.size()]);
+        IntermediateTestOp[] opsArray = ops.toArray(new IntermediateTestOp[ops.size()]);
 
         withData(data).ops(opsArray).
                 without(StreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED).
@@ -197,19 +193,15 @@
 
 
     public void testFlagsClearParallelCollect() {
-        testFlagsClearSequence(CollectorOps::parallelCollector);
+        testFlagsClearSequence(CollectorOps::collector);
     }
 
-    public void testFlagsClearSequentialCollect() {
-        testFlagsClearSequence(() -> CollectorOps.sequentialCollector());
-    }
-
-    protected void testFlagsClearSequence(Supplier<StatefulOp<Integer>> cf) {
+    protected void testFlagsClearSequence(Supplier<StatefulTestOp<Integer>> cf) {
         EnumSet<StreamOpFlag> known = EnumSet.of(StreamOpFlag.ORDERED, StreamOpFlag.SIZED);
         EnumSet<StreamOpFlag> preserve = EnumSet.of(StreamOpFlag.DISTINCT, StreamOpFlag.SORTED);
         EnumSet<StreamOpFlag> notKnown = EnumSet.noneOf(StreamOpFlag.class);
 
-        List<IntermediateOp<Integer, Integer>> ops = new ArrayList<>();
+        List<IntermediateTestOp<Integer, Integer>> ops = new ArrayList<>();
         for (StreamOpFlag f : EnumSet.of(StreamOpFlag.ORDERED, StreamOpFlag.DISTINCT, StreamOpFlag.SORTED)) {
             ops.add(cf.get());
             ops.add(new TestFlagExpectedOp<>(f.clear(),
@@ -228,7 +220,7 @@
 
         StreamTestData<Integer> data = new StreamTestData.ArrayData<>("Array", countTo(10).toArray(new Integer[0]));
         @SuppressWarnings("rawtypes")
-        IntermediateOp[] opsArray = ops.toArray(new IntermediateOp[ops.size()]);
+        IntermediateTestOp[] opsArray = ops.toArray(new IntermediateTestOp[ops.size()]);
 
         withData(data).ops(opsArray).
                 without(StreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED).
@@ -239,22 +231,22 @@
         EnumSet<StreamOpFlag> parKnown = EnumSet.of(StreamOpFlag.SIZED);
         EnumSet<StreamOpFlag> serKnown = parKnown.clone();
 
-        List<IntermediateOp<Integer, Integer>> ops = new ArrayList<>();
+        List<IntermediateTestOp<Integer, Integer>> ops = new ArrayList<>();
         for (StreamOpFlag f : parKnown) {
-            ops.add(CollectorOps.parallelCollector());
+            ops.add(CollectorOps.collector());
             ops.add(new ParSerTestFlagExpectedOp<>(f.clear(),
                                              parKnown,
                                              serKnown));
             serKnown.remove(f);
         }
-        ops.add(CollectorOps.parallelCollector());
+        ops.add(CollectorOps.collector());
         ops.add(new ParSerTestFlagExpectedOp<>(0,
                                          parKnown,
                                          EnumSet.noneOf(StreamOpFlag.class)));
 
         StreamTestData<Integer> data = new StreamTestData.ArrayData<>("Array", countTo(10).toArray(new Integer[0]));
         @SuppressWarnings("rawtypes")
-        IntermediateOp[] opsArray = ops.toArray(new IntermediateOp[ops.size()]);
+        IntermediateTestOp[] opsArray = ops.toArray(new IntermediateTestOp[ops.size()]);
 
         withData(data).ops(opsArray).exercise();
     }
@@ -271,13 +263,13 @@
 
         @Override
         @SuppressWarnings({"unchecked", "rawtypes"})
-        public Sink<T> opWrapSink(int flags, Sink upstream) {
-            assertFlags(flags);
+        public Sink<T> opWrapSink(int flags, boolean parallel, Sink upstream) {
+            assertFlags(flags, parallel);
             return upstream;
         }
 
-        protected void assertFlags(int flags) {
-            if (StreamOpFlag.PARALLEL.isKnown(flags)) {
+        protected void assertFlags(int flags, boolean parallel) {
+            if (parallel) {
                 for (StreamOpFlag f : parKnown) {
                     Assert.assertTrue(f.isKnown(flags), String.format("Flag %s is not known, but should be known.", f.toString()));
                 }
--- a/test-ng/boottests/java/util/stream/StreamFlagsTest.java	Thu Apr 04 22:28:40 2013 -0400
+++ b/test-ng/boottests/java/util/stream/StreamFlagsTest.java	Fri Apr 05 11:25:17 2013 +0200
@@ -68,22 +68,22 @@
 
         assertFlags(OpTestCase.getStreamFlags(arrayList),
                     EnumSet.of(ORDERED, SIZED),
-                    EnumSet.of(DISTINCT, SORTED, SHORT_CIRCUIT, PARALLEL));
+                    EnumSet.of(DISTINCT, SORTED, SHORT_CIRCUIT));
         assertFlags(OpTestCase.getStreamFlags(linkedList),
                     EnumSet.of(ORDERED, SIZED),
-                    EnumSet.of(DISTINCT, SORTED, SHORT_CIRCUIT, PARALLEL));
+                    EnumSet.of(DISTINCT, SORTED, SHORT_CIRCUIT));
         assertFlags(OpTestCase.getStreamFlags(hashSet),
                     EnumSet.of(SIZED, DISTINCT),
-                    EnumSet.of(ORDERED, SORTED, SHORT_CIRCUIT, PARALLEL));
+                    EnumSet.of(ORDERED, SORTED, SHORT_CIRCUIT));
         assertFlags(OpTestCase.getStreamFlags(treeSet),
                     EnumSet.of(ORDERED, SIZED, DISTINCT, SORTED),
-                    EnumSet.of(SHORT_CIRCUIT, PARALLEL));
+                    EnumSet.of(SHORT_CIRCUIT));
         assertFlags(OpTestCase.getStreamFlags(linkedHashSet),
                     EnumSet.of(ORDERED, DISTINCT, SIZED),
-                    EnumSet.of(SORTED, SHORT_CIRCUIT, PARALLEL));
+                    EnumSet.of(SORTED, SHORT_CIRCUIT));
         assertFlags(OpTestCase.getStreamFlags(repeat),
                     EnumSet.of(ORDERED),
-                    EnumSet.of(SIZED, DISTINCT, SORTED, SHORT_CIRCUIT, PARALLEL));
+                    EnumSet.of(SIZED, DISTINCT, SORTED, SHORT_CIRCUIT));
     }
 
     public void testFilter() {
--- a/test-ng/boottests/java/util/stream/StreamOpFlagsTest.java	Thu Apr 04 22:28:40 2013 -0400
+++ b/test-ng/boottests/java/util/stream/StreamOpFlagsTest.java	Fri Apr 05 11:25:17 2013 +0200
@@ -193,9 +193,6 @@
 
         assertSpliteratorMask(StreamOpFlag.SHORT_CIRCUIT.set(), 0);
         assertSpliteratorMask(StreamOpFlag.SHORT_CIRCUIT.clear(), 0);
-
-        assertSpliteratorMask(StreamOpFlag.PARALLEL.set(), 0);
-        assertSpliteratorMask(StreamOpFlag.PARALLEL.clear(), 0);
     }
 
     private void assertSpliteratorMask(int actual, int expected) {
@@ -217,9 +214,6 @@
 
         assertStreamMask(StreamOpFlag.SHORT_CIRCUIT.set(), 0);
         assertStreamMask(StreamOpFlag.SHORT_CIRCUIT.clear(), 0);
-
-        assertStreamMask(StreamOpFlag.PARALLEL.set(), StreamOpFlag.IS_PARALLEL);
-        assertStreamMask(StreamOpFlag.PARALLEL.clear(), 0);
     }
 
     private void assertStreamMask(int actual, int expected) {
@@ -241,9 +235,6 @@
 
         assertOpMask(StreamOpFlag.SHORT_CIRCUIT.set(), StreamOpFlag.IS_SHORT_CIRCUIT);
         assertOpMask(StreamOpFlag.SHORT_CIRCUIT.clear(), 0);
-
-        assertOpMask(StreamOpFlag.PARALLEL.set(), 0);
-        assertOpMask(StreamOpFlag.PARALLEL.clear(), StreamOpFlag.NOT_PARALLEL);
     }
 
     private void assertOpMask(int actual, int expected) {
@@ -265,9 +256,6 @@
 
         assertTerminalOpMask(StreamOpFlag.SHORT_CIRCUIT.set(), StreamOpFlag.IS_SHORT_CIRCUIT);
         assertTerminalOpMask(StreamOpFlag.SHORT_CIRCUIT.clear(), 0);
-
-        assertTerminalOpMask(StreamOpFlag.PARALLEL.set(), 0);
-        assertTerminalOpMask(StreamOpFlag.PARALLEL.clear(), 0);
     }
 
     private void assertTerminalOpMask(int actual, int expected) {
@@ -289,9 +277,6 @@
 
         assertUpstreamTerminalOpMask(StreamOpFlag.SHORT_CIRCUIT.set(), 0);
         assertUpstreamTerminalOpMask(StreamOpFlag.SHORT_CIRCUIT.clear(), 0);
-
-        assertUpstreamTerminalOpMask(StreamOpFlag.PARALLEL.set(), 0);
-        assertUpstreamTerminalOpMask(StreamOpFlag.PARALLEL.clear(), 0);
     }
 
     private void assertUpstreamTerminalOpMask(int actual, int expected) {
@@ -307,7 +292,6 @@
         List<Integer> others = Arrays.asList(Spliterator.NONNULL, Spliterator.IMMUTABLE,
                                              Spliterator.CONCURRENT, Spliterator.SUBSIZED);
         for (int c : others) {
-            assertNotEquals(c, StreamOpFlag.IS_PARALLEL);
             assertNotEquals(c, StreamOpFlag.IS_SHORT_CIRCUIT);
         }
     }
@@ -327,9 +311,6 @@
 
         assertSpliteratorCharacteristicsMask(StreamOpFlag.SHORT_CIRCUIT.set(), 0);
         assertSpliteratorCharacteristicsMask(StreamOpFlag.SHORT_CIRCUIT.clear(), 0);
-
-        assertSpliteratorCharacteristicsMask(StreamOpFlag.PARALLEL.set(), 0);
-        assertSpliteratorCharacteristicsMask(StreamOpFlag.PARALLEL.clear(), 0);
     }
 
     private void assertSpliteratorCharacteristicsMask(int actual, int expected) {
--- a/test-ng/boottests/java/util/stream/UnorderedTest.java	Thu Apr 04 22:28:40 2013 -0400
+++ b/test-ng/boottests/java/util/stream/UnorderedTest.java	Fri Apr 05 11:25:17 2013 +0200
@@ -201,7 +201,7 @@
         }
     }
 
-    static class CheckClearOrderedOp<T> implements IntermediateOp<T, T> {
+    static class CheckClearOrderedOp<T> implements StatelessTestOp<T, T> {
         private final StreamShape shape;
 
         CheckClearOrderedOp(StreamShape shape) {
@@ -219,8 +219,8 @@
         }
 
         @Override
-        public Sink<T> opWrapSink(int flags, Sink<T> sink) {
-            if (StreamOpFlag.PARALLEL.isKnown(flags)) {
+        public Sink<T> opWrapSink(int flags, boolean parallel, Sink<T> sink) {
+            if (parallel) {
                 assertTrue(StreamOpFlag.ORDERED.isCleared(flags));
             }
 
@@ -235,7 +235,7 @@
         }
 
         @Override
-        public Sink<T> opWrapSink(int flags, Sink<T> sink) {
+        public Sink<T> opWrapSink(int flags, boolean parallel, Sink<T> sink) {
             assertTrue(StreamOpFlag.ORDERED.isKnown(flags) || StreamOpFlag.ORDERED.isPreserved(flags));
 
             return sink;
--- a/test-ng/tests/org/openjdk/tests/java/util/stream/StreamSpliteratorTest.java	Thu Apr 04 22:28:40 2013 -0400
+++ b/test-ng/tests/org/openjdk/tests/java/util/stream/StreamSpliteratorTest.java	Fri Apr 05 11:25:17 2013 +0200
@@ -25,6 +25,7 @@
 package org.openjdk.tests.java.util.stream;
 
 import java.util.Arrays;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Spliterator;
 import java.util.function.Consumer;
@@ -107,6 +108,11 @@
         }
 
         @Override
+        public Comparator<? super T> getComparator() {
+            return sp.getComparator();
+        }
+
+        @Override
         public int characteristics() {
             if (proxyEstimateSize)
                 return sp.characteristics();