changeset 7805:f7fb4e3d6a20

- make PipelineHelper an abstract class AbstractPipeline extends from - ops now extend from abstract shape-specific pipeline classes Contributed-by: Brian Goetz <brian.goetz@Oracle.COM>, Paul Sandoz <paul.sandoz@Oracle.COM>
author psandoz
date Wed, 03 Apr 2013 21:51:55 +0200
parents 7a7f3b25d409
children 2ffb91b59273
files src/share/classes/java/util/stream/AbstractPipeline.java src/share/classes/java/util/stream/AbstractShortCircuitTask.java src/share/classes/java/util/stream/AbstractTask.java src/share/classes/java/util/stream/DistinctOp.java src/share/classes/java/util/stream/DistinctOps.java src/share/classes/java/util/stream/DoublePipeline.java src/share/classes/java/util/stream/FindOps.java src/share/classes/java/util/stream/ForEachOps.java src/share/classes/java/util/stream/IntPipeline.java src/share/classes/java/util/stream/IntermediateOp.java src/share/classes/java/util/stream/LongPipeline.java src/share/classes/java/util/stream/MatchOps.java src/share/classes/java/util/stream/NodeUtils.java src/share/classes/java/util/stream/PipelineHelper.java src/share/classes/java/util/stream/ReduceOps.java src/share/classes/java/util/stream/ReferencePipeline.java src/share/classes/java/util/stream/SliceOp.java src/share/classes/java/util/stream/SliceOps.java src/share/classes/java/util/stream/SortedOp.java src/share/classes/java/util/stream/SortedOps.java src/share/classes/java/util/stream/StatefulOp.java src/share/classes/java/util/stream/StreamSpliterators.java src/share/classes/java/util/stream/Streams.java src/share/classes/java/util/stream/TerminalOp.java test-ng/bootlib/java/util/stream/CollectorOps.java test-ng/bootlib/java/util/stream/FlagDeclaringOp.java test-ng/bootlib/java/util/stream/IntermediateOp.java test-ng/bootlib/java/util/stream/OpTestCase.java test-ng/bootlib/java/util/stream/StatefulOp.java test-ng/bootlib/java/util/stream/TestFlagExpectedOp.java test-ng/boottests/java/util/stream/FlagOpTest.java test-ng/boottests/java/util/stream/UnorderedTest.java test-ng/tests/org/openjdk/tests/java/util/stream/RangeTest.java test-ng/tests/org/openjdk/tests/java/util/stream/TabulatorsTest.java
diffstat 34 files changed, 2966 insertions(+), 2459 deletions(-) [+]
line wrap: on
line diff
--- a/src/share/classes/java/util/stream/AbstractPipeline.java	Wed Apr 03 11:29:23 2013 -0700
+++ b/src/share/classes/java/util/stream/AbstractPipeline.java	Wed Apr 03 21:51:55 2013 +0200
@@ -66,15 +66,16 @@
  * @param <S> Type of the subclass implementing {@code BaseStream}
  * @since 1.8
  */
-abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>> /* implements BaseStream */ {
+abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
+        extends PipelineHelper<E_OUT> {
     /** Backlink to the head of the pipeline chain (self if this is the source stage) */
     private final AbstractPipeline sourceStage;
 
     /** The "upstream" pipeline, or null if this is the source stage */
     private final AbstractPipeline previousStage;
 
-    /** The intermediate operation represented by this pipeline object, or null if this is the source stage */
-    private final IntermediateOp op;
+    /** The operation flags for the intermediate operation represented by this pipeline object */
+    protected final int sourceOrOpFlags;
 
     /** The next stage in the pipeline, or null if this is the last stage.
      * Effectively final at the point of linking to the next pipeline.
@@ -107,13 +108,13 @@
     private Supplier<? extends Spliterator<?>> sourceSupplier;
 
     /** True if this pipeline has been consumed */
-    boolean linkedOrConsumed;
+    private boolean linkedOrConsumed;
 
     /** True if there are any stateful ops in the pipeline; only valid for the source stage */
-    boolean sourceAnyStateful;
+    private boolean sourceAnyStateful;
 
     /** True if there have been any calls to .sequential() or .parallel(); only valid for the source stage */
-    boolean sourceAnyParChange;
+    private boolean sourceAnyParChange;
 
     /**
      * Constructor for the head of a stream pipeline.
@@ -121,13 +122,13 @@
      * @param source {@code Supplier<Spliterator>} describing the stream source
      * @param sourceFlags The source flags for the stream source, described in {@link StreamOpFlag}
      */
-    protected AbstractPipeline(Supplier<? extends Spliterator<?>> source,
-                               int sourceFlags) {
+    AbstractPipeline(Supplier<? extends Spliterator<?>> source,
+                     int sourceFlags) {
         this.previousStage = null;
-        this.op = null;
         this.sourceSupplier = source;
         this.sourceStage = this;
-        this.combinedFlags = StreamOpFlag.combineOpFlags(sourceFlags, StreamOpFlag.INITIAL_OPS_VALUE);
+        this.sourceOrOpFlags = StreamOpFlag.combineOpFlags(sourceFlags, StreamOpFlag.INITIAL_OPS_VALUE);
+        this.combinedFlags = sourceOrOpFlags;
         this.depth = 0;
     }
 
@@ -137,13 +138,13 @@
      * @param source {@code Spliterator} describing the stream source
      * @param sourceFlags The source flags for the stream source, described in {@link StreamOpFlag}
      */
-    protected AbstractPipeline(Spliterator<?> source,
-                               int sourceFlags) {
+    AbstractPipeline(Spliterator<?> source,
+                     int sourceFlags) {
         this.previousStage = null;
-        this.op = null;
+        this.sourceOrOpFlags = StreamOpFlag.combineOpFlags(sourceFlags, StreamOpFlag.INITIAL_OPS_VALUE);
         this.sourceSpliterator = source;
         this.sourceStage = this;
-        this.combinedFlags = StreamOpFlag.combineOpFlags(sourceFlags, StreamOpFlag.INITIAL_OPS_VALUE);
+        this.combinedFlags = sourceOrOpFlags;
         this.depth = 0;
     }
 
@@ -151,75 +152,23 @@
      * Constructor for appending an intermediate operation onto an existing pipeline.
      *
      * @param previousStage the upstream element source.
-     * @param op the operation performed upon elements.
      */
-    protected AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage,
-                               IntermediateOp<E_IN, E_OUT> op) {
-        assert getOutputShape() == op.outputShape();
-        assert previousStage.getOutputShape() == op.inputShape();
+    AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage,
+                     int opFlags) {
         if (previousStage.linkedOrConsumed)
             throw new IllegalStateException("stream has already been operated upon");
         previousStage.linkedOrConsumed = true;
         previousStage.nextStage = this;
 
         this.previousStage = previousStage;
-        this.op = op;
-        this.combinedFlags = StreamOpFlag.combineOpFlags(op.getOpFlags() & StreamOpFlag.OP_MASK,
-                                                         previousStage.combinedFlags);
+        this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
+        this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
         this.sourceStage = previousStage.sourceStage;
-        if (op.isStateful())
+        if (opIsStateful())
             sourceStage.sourceAnyStateful = true;
         this.depth = previousStage.depth + 1;
     }
 
-    // Chaining and result methods
-
-    /**
-     * Chain an operation to the tail of this pipeline to create a new stream.
-     *
-     * @param newOp the operation to chain.
-     * @param <E_NEXT> the type of elements output from the new stream.
-     * @param <S_NEXT> the type of stream.
-     * @return the new stream.
-     */
-    @SuppressWarnings("unchecked")
-    public <E_NEXT, S_NEXT extends BaseStream<E_NEXT, S_NEXT>>
-    S_NEXT pipeline(IntermediateOp<E_OUT, E_NEXT> newOp) {
-        return (S_NEXT) chain(this, newOp);
-    }
-
-    /** Specialized version of pipeline for stateless reference-bearing intermediate ops */
-    protected<V> Stream<V> chainedToRef(int opFlags,
-                                        StreamShape inputShape,
-                                        SinkWrapper<E_OUT> sinkWrapper) {
-        return new ReferencePipeline<>(this, new StatelessOp<>(opFlags, inputShape, StreamShape.REFERENCE,
-                                                               sinkWrapper));
-    }
-
-    /** Specialized version of pipeline for stateless int-bearing intermediate ops */
-    protected IntStream chainedToInt(int opFlags,
-                                     StreamShape inputShape,
-                                     SinkWrapper<E_OUT> sinkWrapper) {
-        return new IntPipeline<>(this, new StatelessOp<>(opFlags, inputShape, StreamShape.INT_VALUE,
-                                                         sinkWrapper));
-    }
-
-    /** Specialized version of pipeline for stateless long-bearing intermediate ops */
-    protected LongStream chainedToLong(int opFlags,
-                                       StreamShape inputShape,
-                                       SinkWrapper<E_OUT> sinkWrapper) {
-        return new LongPipeline<>(this, new StatelessOp<>(opFlags, inputShape, StreamShape.LONG_VALUE,
-                                                          sinkWrapper));
-    }
-
-    /** Specialized version of pipeline for stateless double-bearing intermediate ops */
-    protected DoubleStream chainedToDouble(int opFlags,
-                                           StreamShape inputShape,
-                                           SinkWrapper<E_OUT> sinkWrapper) {
-        return new DoublePipeline<>(this, new StatelessOp<>(opFlags, inputShape,
-                                                            StreamShape.DOUBLE_VALUE, sinkWrapper));
-    }
-
     /**
      * Prepare the pipeline for evaluation.
      * @param terminalFlags
@@ -232,8 +181,8 @@
                 for (AbstractPipeline u = sourceStage, p = sourceStage.nextStage;
                      p != null;
                      u = p, p = p.nextStage) {
-                    int thisOpFlags = p.op.getOpFlags();
-                    if (p.op.isStateful()) {
+                    int thisOpFlags = p.sourceOrOpFlags;
+                    if (p.opIsStateful()) {
                         // If the stateful operation is a short-circuit operation
                         // then move the back propagation head forwards
                         // NOTE: there are no size-injecting ops
@@ -264,7 +213,7 @@
                 for (AbstractPipeline u = sourceStage, p = sourceStage.nextStage;
                      p != null;
                      u = p, p = p.nextStage) {
-                    p.combinedFlags = StreamOpFlag.combineOpFlags(p.op.getOpFlags(), u.combinedFlags);
+                    p.combinedFlags = StreamOpFlag.combineOpFlags(p.sourceOrOpFlags, u.combinedFlags);
                 }
             }
         }
@@ -273,6 +222,9 @@
             combinedFlags = StreamOpFlag.combineOpFlags(terminalFlags, combinedFlags);
     }
 
+
+    // Terminal evaluation methods
+
     /**
      * Evaluate the pipeline with a terminal operation to produce a result.
      *
@@ -280,17 +232,16 @@
      * @param <R> the type of result.
      * @return the result.
      */
-    public <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
+    final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
         assert getOutputShape() == terminalOp.inputShape();
         if (linkedOrConsumed)
             throw new IllegalStateException("stream has already been operated upon");
         linkedOrConsumed = true;
 
         prepare(terminalOp.getOpFlags());
-        PipelineHelperImpl helper = new PipelineHelperImpl();
         return isParallel()
-               ? (R) terminalOp.evaluateParallel(helper, sourceSpliterator())
-               : (R) terminalOp.evaluateSequential(helper, sourceSpliterator());
+               ? (R) terminalOp.evaluateParallel(this, sourceSpliterator())
+               : (R) terminalOp.evaluateSequential(this, sourceSpliterator());
     }
 
     /**
@@ -299,7 +250,7 @@
      * @param generator the array generator to be used to create array instances.
      * @return a node that holds the collected output elements.
      */
-    public Node<E_OUT> evaluateToArrayNode(IntFunction<E_OUT[]> generator) {
+    final Node<E_OUT> evaluateToArrayNode(IntFunction<E_OUT[]> generator) {
         if (linkedOrConsumed)
             throw new IllegalStateException("stream has already been operated upon");
         linkedOrConsumed = true;
@@ -307,25 +258,19 @@
         prepare(0);
         // If the last intermediate operation is stateful then
         // evaluate directly to avoid an extra collection step
-        if (isParallel() && op != null && op.isStateful()) {
-            PipelineHelperImpl helper = previousStage.new PipelineHelperImpl();
-            return op.evaluateParallel(helper, previousStage.sourceSpliterator(), generator)
-                     .flatten(generator);
+        if (isParallel() && previousStage != null && opIsStateful()) {
+            return opEvaluateParallel(previousStage, previousStage.sourceSpliterator(), generator)
+                    .flatten(generator);
         }
         else {
-            PipelineHelperImpl helper = new PipelineHelperImpl();
-            return helper.evaluate(sourceSpliterator(), true, generator);
+            return evaluate(sourceSpliterator(), true, generator);
         }
     }
 
-    /** Common implementation of {@code limit()} / {@code substream} for all shapes */
-    protected S slice(long skip, long limit) {
-        // @@@ Optimize for case where depth=0 or pipeline is size-preserving
-        return pipeline(new SliceOp<E_OUT>(skip, limit, getOutputShape()));
-    }
 
-    // Implements sequential() from BaseStream
-    public S sequential() {
+    // BaseStream
+
+    public final S sequential() {
         if (StreamOpFlag.PARALLEL.isKnown(sourceStage.combinedFlags)) {
             sourceStage.sourceAnyParChange = true;
             sourceStage.combinedFlags = StreamOpFlag.combineOpFlags(StreamOpFlag.NOT_PARALLEL,
@@ -334,8 +279,7 @@
         return (S) this;
     }
 
-    // Implements parallel() from BaseStream
-    public S parallel() {
+    public final S parallel() {
         if (!StreamOpFlag.PARALLEL.isKnown(sourceStage.combinedFlags)) {
             sourceStage.sourceAnyParChange = true;
             sourceStage.combinedFlags = StreamOpFlag.combineOpFlags(StreamOpFlag.IS_PARALLEL,
@@ -344,96 +288,7 @@
         return (S) this;
     }
 
-    /**
-     * Get the output shape of the pipeline.
-     *
-     * @return the output shape. If the pipeline is the head then it's output shape corresponds to the shape of the
-     * source. Otherwise, it's output shape corresponds to the output shape of the associated operation.
-     */
-    protected abstract StreamShape getOutputShape();
-
-    /**
-     * Collect elements output from a pipeline into Node that holds elements of this shape.
-     *
-     * @param helper the parallel pipeline helper from which elements are obtained.
-     * @param spliterator the source spliterator
-     * @param flattenTree true of the returned node should be flattened to one node holding an
-     *                    array of elements.
-     * @param generator the array generator
-     * @return the node holding elements output from the pipeline.
-     */
-    protected abstract<P_IN> Node<E_OUT> evaluateToNode(PipelineHelper<P_IN, E_OUT> helper,
-                                                        Spliterator<P_IN> spliterator,
-                                                        boolean flattenTree,
-                                                        IntFunction<E_OUT[]> generator);
-
-    /**
-     * Create a spliterator that wraps a source spliterator, compatible with this stream shape,
-     * and operations associated with a {@link PipelineHelper}.
-     *
-     * @param ph the pipeline helper.
-     * @param supplier the supplier of a spliterator
-     * @return the wrapping spliterator compatible with this shape.
-     */
-    protected abstract<P_IN> Spliterator<E_OUT> wrap(PipelineHelper<P_IN, E_OUT> ph,
-                                                     Supplier<Spliterator<P_IN>> supplier,
-                                                     boolean isParallel);
-
-    /**
-     * Create a lazy spliterator that wraps and obtains the supplied the spliterator
-     * when method is invoked on the lazy spliterator.
-     *
-     */
-    protected abstract Spliterator<E_OUT> lazySpliterator(Supplier<? extends Spliterator<E_OUT>> supplier);
-
-    /**
-     * Traverse elements of a spliterator, compatible with this stream shape, pushing those elements into a sink.
-     * <p>If the sink is cancelled no further elements will be pulled or pushed and this method will return.</p>
-     *
-     * @param spliterator the spliterator to pull elements from
-     * @param sink the sink to push elements to.
-     */
-    protected abstract void forEachWithCancel(Spliterator<E_OUT> spliterator, Sink<E_OUT> sink);
-
-    /**
-     * Make a node builder, compatible with this stream shape.
-     *
-     * @param exactSizeIfKnown if >=0 then a node builder will be created that has a fixed capacity of at most
-     *                         sizeIfKnown elements.
-     *                         If < 0 then the node builder has an unfixed capacity.
-     *                         A fixed capacity node builder will throw exceptions if an element is added and
-     *                         the builder has reached capacity.
-     * @param generator the array generator to be used to create instances of a T[] array. Note for factory
-     *                  implementations supporting primitive nodes then this parameter may be ignored.
-     * @return the node builder.
-     */
-    protected abstract Node.Builder<E_OUT> makeNodeBuilder(long exactSizeIfKnown,
-                                                           IntFunction<E_OUT[]> generator);
-
-    /**
-     * Create a new pipeline by chaining an intermediate operation to an upstream pipeline.
-     * <p>
-     * The output shape if the upstream pipeline must be the same as the input shape of
-     * the intermediate operation.
-     * </p>
-     * @param upstream the upstream pipeline.
-     * @param op the intermediate operation.
-     * @param <U> the type of elements output from the upstream pipeline and input to the new stream.
-     * @param <V> the type of elements output from the new pipeline.
-     * @return a the new pipeline.
-     */
-    static <U, V> AbstractPipeline<U, V, ?> chain(AbstractPipeline<?, U, ?> upstream,
-                                                  IntermediateOp<U, V> op) {
-        switch (op.outputShape()) {
-            case REFERENCE:    return new ReferencePipeline<>(upstream, op);
-            case INT_VALUE:    return new IntPipeline(upstream, op);
-            case LONG_VALUE:   return new LongPipeline(upstream, op);
-            case DOUBLE_VALUE: return new DoublePipeline(upstream, op);
-            default: throw new IllegalStateException("Unknown shape: " + op.outputShape());
-        }
-    }
-
-    // from BaseStream
+    // Primitive specialization use co-variant overrides, hence is not final
     public Spliterator<E_OUT> spliterator() {
         if (linkedOrConsumed)
             throw new IllegalStateException("stream has already been operated upon");
@@ -456,11 +311,17 @@
             }
         }
         else {
-            PipelineHelperImpl helper = new PipelineHelperImpl();
-            return wrap(helper, () -> sourceSpliterator(), isParallel());
+            return wrap(this, () -> sourceSpliterator(), isParallel());
         }
     }
 
+    public final boolean isParallel() {
+        return StreamOpFlag.PARALLEL.isKnown(sourceStage.combinedFlags);
+    }
+
+
+    //
+
     /**
      * Returns the composition of stream flags of the stream source and all
      * intermediate operations.
@@ -469,55 +330,13 @@
      *         intermediate operations
      * @see StreamOpFlag
      */
-    protected int getStreamFlags() {
+    final int getStreamFlags() {
         // @@@ Currently only used by tests, review and see if functionality
         //     can be replaced by spliterator().characteristics()
         prepare(0);
         return StreamOpFlag.toStreamFlags(combinedFlags);
     }
 
-    // from BaseStream
-    public boolean isParallel() {
-        return StreamOpFlag.PARALLEL.isKnown(sourceStage.combinedFlags);
-    }
-
-    interface SinkWrapper<T> {
-        public Sink<T> wrapSink(int flags, Sink sink);
-    }
-
-    static final class StatelessOp<T,U> implements IntermediateOp<T,U> {
-        private final int opFlags;
-        private final StreamShape inputShape, outputShape;
-        private final SinkWrapper<T> sinkWrapper;
-
-        StatelessOp(int opFlags,
-                    StreamShape inputShape,
-                    StreamShape outputShape,
-                    SinkWrapper<T> wrapper) {
-            this.opFlags = opFlags;
-            this.inputShape = inputShape;
-            this.outputShape = outputShape;
-            sinkWrapper = wrapper;
-        }
-
-        public StreamShape outputShape() {
-            return outputShape;
-        }
-
-        public StreamShape inputShape() {
-            return inputShape;
-        }
-
-        public int getOpFlags() {
-            return opFlags;
-        }
-
-        @Override
-        public Sink<T> wrapSink(int flags, Sink<U> sink) {
-            return sinkWrapper.wrapSink(flags, sink);
-        }
-    }
-
     /**
      * Get the source spliterator for this pipeline stage
      */
@@ -543,9 +362,8 @@
                  u != e;
                  u = p, p = p.nextStage) {
 
-                if (p.op.isStateful()) {
-                    PipelineHelperImpl helper = u.new PipelineHelperImpl();
-                    spliterator = p.op.evaluateParallelLazy(helper, spliterator);
+                if (p.opIsStateful()) {
+                    spliterator = p.opEvaluateParallelLazy(u, spliterator);
                 }
             }
         }
@@ -553,109 +371,224 @@
         return spliterator;
     }
 
-    private final class PipelineHelperImpl<P_IN> implements PipelineHelper<P_IN, E_OUT> {
 
-        @Override
-        public long exactOutputSizeIfKnown(Spliterator<P_IN> spliterator) {
-            return StreamOpFlag.SIZED.isKnown(getStreamAndOpFlags()) ? spliterator.getExactSizeIfKnown() : -1;
-        }
+    // PipelineHelper
 
-        @Override
-        public<S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
-            copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
-            return sink;
-        }
+    @Override
+    final <P_IN> long exactOutputSizeIfKnown(Spliterator<P_IN> spliterator) {
+        return StreamOpFlag.SIZED.isKnown(getStreamAndOpFlags()) ? spliterator.getExactSizeIfKnown() : -1;
+    }
 
-        @Override
-        public void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
-            Objects.requireNonNull(wrappedSink);
+    @Override
+    final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
+        copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
+        return sink;
+    }
 
-            if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
-                wrappedSink.begin(spliterator.getExactSizeIfKnown());
-                spliterator.forEachRemaining(wrappedSink);
-                wrappedSink.end();
-            }
-            else {
-                copyIntoWithCancel(wrappedSink, spliterator);
-            }
-        }
+    @Override
+    final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
+        Objects.requireNonNull(wrappedSink);
 
-        /**
-         * Pushes elements obtained from the {@code Spliterator} into the provided
-         * {@code Sink}, checking {@link Sink#cancellationRequested()} after each
-         * element, and stopping if cancellation is requested.
-         *
-         * @implSpec
-         * This method conforms to the {@code Sink} protocol of calling
-         * {@code Sink.begin} before pushing elements, via {@code Sink.accept}, and
-         * calling {@code Sink.end} after all elements have been pushed or if
-         * cancellation is requested.
-         *
-         * @param wrappedSink the destination {@code Sink}
-         * @param spliterator the source {@code Spliterator}
-         */
-        private void copyIntoWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
-            AbstractPipeline p = AbstractPipeline.this;
-            while (p.depth > 0) {
-                p = p.previousStage;
-            }
+        if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
             wrappedSink.begin(spliterator.getExactSizeIfKnown());
-            p.forEachWithCancel(spliterator, wrappedSink);
+            spliterator.forEachRemaining(wrappedSink);
             wrappedSink.end();
         }
-
-        @Override
-        public int getStreamAndOpFlags() {
-            return combinedFlags;
-        }
-
-        @Override
-        public Sink<P_IN> wrapSink(Sink sink) {
-            Objects.requireNonNull(sink);
-
-            for (AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
-                sink = p.op.wrapSink(p.previousStage.combinedFlags, sink);
-            }
-            return sink;
-        }
-
-        @Override
-        public Node.Builder<E_OUT> makeNodeBuilder(long exactSizeIfKnown,
-                                                   IntFunction<E_OUT[]> generator) {
-            return AbstractPipeline.this.makeNodeBuilder(exactSizeIfKnown, generator);
-        }
-
-        @Override
-        @SuppressWarnings("unchecked")
-        public Node<E_OUT> evaluate(Spliterator<P_IN> spliterator,
-                                    boolean flatten,
-                                    IntFunction<E_OUT[]> generator) {
-            if (isParallel()) {
-                // @@@ Optimize if op of this pipeline stage is a stateful op
-                return evaluateToNode(this, spliterator, flatten, generator);
-            }
-            else {
-                Node.Builder<E_OUT> nb = makeNodeBuilder(
-                        exactOutputSizeIfKnown(spliterator), generator);
-                return wrapAndCopyInto(nb, spliterator).build();
-            }
-        }
-
-        @Override
-        public Node<E_OUT> evaluateSequential(IntermediateOp<E_OUT, E_OUT> op,
-                                              Spliterator<P_IN> sourceSpliterator,
-                                              IntFunction<E_OUT[]> generator) {
-            long sizeIfKnown = StreamOpFlag.SIZED.isPreserved(op.getOpFlags())
-                               ? exactOutputSizeIfKnown(sourceSpliterator)
-                               : -1;
-            final Node.Builder<E_OUT> nb = makeNodeBuilder(sizeIfKnown, generator);
-            Sink<E_OUT> opSink = op.wrapSink(getStreamAndOpFlags(), nb);
-
-            if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(op.getOpFlags()))
-                wrapAndCopyInto(opSink, sourceSpliterator);
-            else
-                copyIntoWithCancel(wrapSink(opSink), sourceSpliterator);
-            return nb.build();
+        else {
+            copyIntoWithCancel(wrappedSink, spliterator);
         }
     }
+
+    @Override
+    final <P_IN> void copyIntoWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
+        AbstractPipeline p = AbstractPipeline.this;
+        while (p.depth > 0) {
+            p = p.previousStage;
+        }
+        wrappedSink.begin(spliterator.getExactSizeIfKnown());
+        p.forEachWithCancel(spliterator, wrappedSink);
+        wrappedSink.end();
+    }
+
+    @Override
+    final int getStreamAndOpFlags() {
+        return combinedFlags;
+    }
+
+    @Override
+    final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
+        Objects.requireNonNull(sink);
+
+        for (AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
+            sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
+        }
+        return (Sink<P_IN>) sink;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    final <P_IN> Node<E_OUT> evaluate(Spliterator<P_IN> spliterator,
+                                      boolean flatten,
+                                      IntFunction<E_OUT[]> generator) {
+        if (isParallel()) {
+            // @@@ Optimize if op of this pipeline stage is a stateful op
+            return evaluateToNode(this, spliterator, flatten, generator);
+        }
+        else {
+            Node.Builder<E_OUT> nb = makeNodeBuilder(
+                    exactOutputSizeIfKnown(spliterator), generator);
+            return wrapAndCopyInto(nb, spliterator).build();
+        }
+    }
+
+
+    // Shape-specific abstract methods
+
+    /**
+     * Get the output shape of the pipeline.
+     *
+     * @return the output shape. If the pipeline is the head then it's output shape corresponds to the shape of the
+     * source. Otherwise, it's output shape corresponds to the output shape of the associated operation.
+     */
+    abstract StreamShape getOutputShape();
+
+    /**
+     * Collect elements output from a pipeline into Node that holds elements of
+     * this shape.
+     *
+     * @param helper the parallel pipeline helper from which elements are
+     * obtained.
+     * @param spliterator the source spliterator
+     * @param flattenTree true of the returned node should be flattened to one
+     * node holding an array of elements.
+     * @param generator the array generator
+     * @return the node holding elements output from the pipeline.
+     */
+    abstract <P_IN> Node<E_OUT> evaluateToNode(PipelineHelper<E_OUT> helper,
+                                               Spliterator<P_IN> spliterator,
+                                               boolean flattenTree,
+                                               IntFunction<E_OUT[]> generator);
+
+    /**
+     * Create a spliterator that wraps a source spliterator, compatible with
+     * this stream shape, and operations associated with a {@link
+     * PipelineHelper}.
+     *
+     * @param ph the pipeline helper.
+     * @param supplier the supplier of a spliterator
+     * @return the wrapping spliterator compatible with this shape.
+     */
+    abstract <P_IN> Spliterator<E_OUT> wrap(PipelineHelper<E_OUT> ph,
+                                            Supplier<Spliterator<P_IN>> supplier,
+                                            boolean isParallel);
+
+    /**
+     * Create a lazy spliterator that wraps and obtains the supplied the spliterator
+     * when method is invoked on the lazy spliterator.
+     *
+     */
+    abstract Spliterator<E_OUT> lazySpliterator(Supplier<? extends Spliterator<E_OUT>> supplier);
+
+    /**
+     * Traverse elements of a spliterator, compatible with this stream shape, pushing those elements into a sink.
+     * <p>If the sink is cancelled no further elements will be pulled or pushed and this method will return.</p>
+     *
+     * @param spliterator the spliterator to pull elements from
+     * @param sink the sink to push elements to.
+     */
+    abstract void forEachWithCancel(Spliterator<E_OUT> spliterator, Sink<E_OUT> sink);
+
+    /**
+     * Make a node builder, compatible with this stream shape.
+     *
+     * @param exactSizeIfKnown if >=0 then a node builder will be created that
+     * has a fixed capacity of at most sizeIfKnown elements. If < 0 then the
+     * node builder has an unfixed capacity. A fixed capacity node builder will
+     * throw exceptions if an element is added and the builder has reached
+     * capacity.
+     * @param generator the array generator to be used to create instances of a
+     * T[] array. Note for factory implementations supporting primitive nodes
+     * then this parameter may be ignored.
+     * @return the node builder.
+     */
+    abstract Node.Builder<E_OUT> makeNodeBuilder(long exactSizeIfKnown,
+                                                 IntFunction<E_OUT[]> generator);
+
+
+    // Op-specific abstract methods
+
+    /**
+     * Returns whether this operation is stateful or not.  If it is stateful,
+     * then the method
+     * {@link #opEvaluateParallel(PipelineHelper, java.util.Spliterator, java.util.function.IntFunction)}
+     * must be overridden.
+     *
+     * @implSpec The default implementation returns {@code false}.
+     * @return {@code true} if this operation is stateful
+     */
+    abstract boolean opIsStateful();
+
+    /**
+     * Accepts a {@code Sink} which will receive the results of this operation,
+     * and return a {@code Sink} which accepts elements of the input type of
+     * this operation and which performs the operation, passing the results to
+     * the provided {@code Sink}.
+     *
+     * <p>The implementation may use the {@code flags} parameter to optimize the
+     * sink wrapping.  For example, if the input is already {@code DISTINCT},
+     * the implementation for the {@code Stream#distinct()} method could just
+     * return the sink it was passed.
+     *
+     * @param flags The combined stream and operation flags up to, but not
+     *        including, this operation.
+     * @param sink elements will be sent to this sink after the processing.
+     * @return a sink which will accept elements and perform the operation upon
+     *         each element, passing the results (if any) to the provided
+     *         {@code Sink}.
+     */
+    abstract Sink<E_IN> opWrapSink(int flags, Sink<E_OUT> sink);
+
+    /**
+     * Performs a parallel evaluation of the operation using the specified
+     * {@code PipelineHelper} which describes the stream source and upstream
+     * intermediate operations.  Only called on stateful operations.  If {@link
+     * #opIsStateful()} returns true then implementations must override the
+     * default implementation.
+     *
+     * @param helper the pipeline helper
+     * @param spliterator the source {@code Spliterator}
+     * @param generator the array generator
+     * @return a {@code Node} describing the result of the evaluation
+     * @implSpec The default implementation throws an {@link
+     * UnsupportedOperationException}
+     */
+    <P_IN> Node<E_OUT> opEvaluateParallel(PipelineHelper<E_OUT> helper,
+                                          Spliterator<P_IN> spliterator,
+                                          IntFunction<E_OUT[]> generator) {
+        throw new UnsupportedOperationException("Parallel evaluation is not supported");
+    }
+
+    /**
+     * Returns a {@code Spliterator} describing a parallel evaluation of the
+     * operation using the specified {@code PipelineHelper} which describes the
+     * stream source and upstream intermediate operations.  Only called on
+     * stateful operations.  It is not necessary (though acceptable) to do a
+     * full computation of the result here; it is preferable, if possible, to
+     * describe the result via a lazily evaluated spliterator.
+     *
+     * @param helper the pipeline helper
+     * @param spliterator the source {@code Spliterator}
+     * @return a {@code Spliterator} describing the result of the evaluation
+     * @implSpec The default implementation behaves as if:
+     * <pre>{@code
+     *     return evaluateParallel(helper, i -> (E_OUT[]) new
+     * Object[i]).spliterator();
+     * }</pre>
+     * and is suitable for implementations that cannot do better than a full
+     * synchronous evaluation.
+     */
+    <P_IN> Spliterator<E_OUT> opEvaluateParallelLazy(PipelineHelper<E_OUT> helper,
+                                                     Spliterator<P_IN> spliterator) {
+        return opEvaluateParallel(helper, spliterator, i -> (E_OUT[]) new Object[i]).spliterator();
+    }
 }
--- a/src/share/classes/java/util/stream/AbstractShortCircuitTask.java	Wed Apr 03 11:29:23 2013 -0700
+++ b/src/share/classes/java/util/stream/AbstractShortCircuitTask.java	Wed Apr 03 21:51:55 2013 +0200
@@ -56,7 +56,7 @@
     protected volatile boolean canceled;
 
     /** Constructor for root nodes */
-    protected AbstractShortCircuitTask(PipelineHelper<P_IN, P_OUT> helper,
+    protected AbstractShortCircuitTask(PipelineHelper<P_OUT> helper,
                                        Spliterator<P_IN> spliterator) {
         super(helper, spliterator);
         sharedResult = new AtomicReference<>(null);
--- a/src/share/classes/java/util/stream/AbstractTask.java	Wed Apr 03 11:29:23 2013 -0700
+++ b/src/share/classes/java/util/stream/AbstractTask.java	Wed Apr 03 21:51:55 2013 +0200
@@ -91,7 +91,7 @@
     static final int LEAF_TARGET = ForkJoinPool.getCommonPoolParallelism() << 2;
 
     /** The pipeline helper, common to all tasks in a computation */
-    protected final PipelineHelper<P_IN, P_OUT> helper;
+    protected final PipelineHelper<P_OUT> helper;
 
     /**
      * The spliterator for the portion of the input associated with the subtree
@@ -117,7 +117,7 @@
     /**
      * Constructor for root nodes.
      */
-    protected AbstractTask(PipelineHelper<P_IN, P_OUT> helper,
+    protected AbstractTask(PipelineHelper<P_OUT> helper,
                            Spliterator<P_IN> spliterator) {
         super(null);
         this.helper = helper;
@@ -129,7 +129,7 @@
      * Alternate constructor for root nodes that have already gotten the
      * Spliterator from the helper.
      */
-    protected AbstractTask(PipelineHelper<P_IN, P_OUT> helper,
+    protected AbstractTask(PipelineHelper<P_OUT> helper,
                            Spliterator<P_IN> spliterator,
                            long targetSize) {
         super(null);
@@ -176,7 +176,7 @@
      * Suggests whether it is adviseable to split the provided spliterator based
      * on target size and other considerations, such as pool state
      */
-    public static<P_IN, P_OUT> boolean suggestSplit(PipelineHelper<P_IN, P_OUT> helper,
+    public static<P_IN, P_OUT> boolean suggestSplit(PipelineHelper<P_OUT> helper,
                                                     Spliterator spliterator,
                                                     long targetSize) {
         long remaining = spliterator.estimateSize();
--- a/src/share/classes/java/util/stream/DistinctOp.java	Wed Apr 03 11:29:23 2013 -0700
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,151 +0,0 @@
-/*
- * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.  Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-package java.util.stream;
-
-import java.util.HashSet;
-import java.util.LinkedHashSet;
-import java.util.Objects;
-import java.util.Set;
-import java.util.Spliterator;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.IntFunction;
-
-/**
- * A {@link StatefulOp} that eliminates duplicates from a stream.
- * @param <T> The input and output type of the stream pipeline
- * @since 1.8
- */
-final class DistinctOp<T> implements StatefulOp<T> {
-    @Override
-    public int getOpFlags() {
-        return StreamOpFlag.IS_DISTINCT | StreamOpFlag.NOT_SIZED;
-    }
-
-    @Override
-    public Sink<T> wrapSink(int flags, Sink sink) {
-        Objects.requireNonNull(sink);
-
-        if (StreamOpFlag.DISTINCT.isKnown(flags)) {
-            return sink;
-        }
-        else if (StreamOpFlag.SORTED.isKnown(flags)) {
-            return new Sink.ChainedReference<T>(sink) {
-                boolean seenNull;
-                T lastSeen;
-
-                @Override
-                public void begin(long size) {
-                    seenNull = false;
-                    lastSeen = null;
-                    downstream.begin(-1);
-                }
-
-                @Override
-                public void end() {
-                    seenNull = false;
-                    lastSeen = null;
-                    downstream.end();
-                }
-
-                @Override
-                public void accept(T t) {
-                    if (t == null) {
-                        if (!seenNull) {
-                            seenNull = true;
-                            downstream.accept(lastSeen = null);
-                        }
-                    } else if (lastSeen == null || !t.equals(lastSeen)) {
-                        downstream.accept(lastSeen = t);
-                    }
-                }
-            };
-        }
-        else {
-            return new Sink.ChainedReference<T>(sink) {
-                Set<T> seen;
-
-                @Override
-                public void begin(long size) {
-                    seen = new HashSet<>();
-                    downstream.begin(-1);
-                }
-
-                @Override
-                public void end() {
-                    seen = null;
-                    downstream.end();
-                }
-
-                @Override
-                public void accept(T t) {
-                    if (!seen.contains(t)) {
-                        seen.add(t);
-                        downstream.accept(t);
-                    }
-                }
-            };
-        }
-    }
-
-    @Override
-    public <S> Node<T> evaluateParallel(PipelineHelper<S, T> helper,
-                                        Spliterator<S> spliterator,
-                                        IntFunction<T[]> generator) {
-        if (StreamOpFlag.DISTINCT.isKnown(helper.getStreamAndOpFlags())) {
-            // No-op
-            return helper.evaluate(spliterator, false, generator);
-        }
-        else if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
-            // If the stream is SORTED then it should also be ORDERED so the following will also
-            // preserve the sort order
-            TerminalOp<T, LinkedHashSet<T>> reduceOp
-                    = ReduceOps.<T, LinkedHashSet<T>>makeRef(LinkedHashSet::new, LinkedHashSet::add,
-                                                             LinkedHashSet::addAll);
-            return Nodes.node(reduceOp.evaluateParallel(helper, spliterator));
-        }
-        else {
-            // Holder of null state since ConcurrentHashMap does not support null values
-            AtomicBoolean seenNull = new AtomicBoolean(false);
-            ConcurrentHashMap<T, Boolean> map = new ConcurrentHashMap<>();
-            TerminalOp<T, Void> forEachOp = ForEachOps.makeRef(t -> {
-                if (t == null)
-                    seenNull.set(true);
-                else
-                    map.putIfAbsent(t, Boolean.TRUE);
-            }, false);
-            forEachOp.evaluateParallel(helper, spliterator);
-
-            // If null has been seen then copy the key set into a HashSet that supports null values
-            // and add null
-            Set<T> keys = map.keySet();
-            if (seenNull.get()) {
-                keys = new HashSet<>(keys);
-                keys.add(null);
-            }
-            return Nodes.node(keys);
-        }
-    }
-}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/share/classes/java/util/stream/DistinctOps.java	Wed Apr 03 21:51:55 2013 +0200
@@ -0,0 +1,157 @@
+/*
+ * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package java.util.stream;
+
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.Objects;
+import java.util.Set;
+import java.util.Spliterator;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.IntFunction;
+
+/**
+ * Factory methods for transforming streams into duplicate-free streams.
+ * @since 1.8
+ */
+final class DistinctOps {
+
+    private DistinctOps() { }
+
+    /**
+     * Appends a "distinct" operation to the provided stream.
+     *
+     * @param <T> The type of both input and output elements
+     * @param upstream A reference stream with element type T
+     */
+    static<T> ReferencePipeline<T, T> makeRef(AbstractPipeline<?, T, ?> upstream) {
+        return new ReferencePipeline.StatefulOp<T, T>(upstream, StreamShape.REFERENCE,
+                                                      StreamOpFlag.IS_DISTINCT | StreamOpFlag.NOT_SIZED) {
+            @Override
+            <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper, Spliterator<P_IN> spliterator, IntFunction<T[]> generator) {
+                if (StreamOpFlag.DISTINCT.isKnown(helper.getStreamAndOpFlags())) {
+                    // No-op
+                    return helper.evaluate(spliterator, false, generator);
+                }
+                else if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
+                    // If the stream is SORTED then it should also be ORDERED so the following will also
+                    // preserve the sort order
+                    TerminalOp<T, LinkedHashSet<T>> reduceOp
+                            = ReduceOps.<T, LinkedHashSet<T>>makeRef(LinkedHashSet::new, LinkedHashSet::add,
+                                                                     LinkedHashSet::addAll);
+                    return Nodes.node(reduceOp.evaluateParallel(helper, spliterator));
+                }
+                else {
+                    // Holder of null state since ConcurrentHashMap does not support null values
+                    AtomicBoolean seenNull = new AtomicBoolean(false);
+                    ConcurrentHashMap<T, Boolean> map = new ConcurrentHashMap<>();
+                    TerminalOp<T, Void> forEachOp = ForEachOps.makeRef(t -> {
+                        if (t == null)
+                            seenNull.set(true);
+                        else
+                            map.putIfAbsent(t, Boolean.TRUE);
+                    }, false);
+                    forEachOp.evaluateParallel(helper, spliterator);
+
+                    // If null has been seen then copy the key set into a HashSet that supports null values
+                    // and add null
+                    Set<T> keys = map.keySet();
+                    if (seenNull.get()) {
+                        keys = new HashSet<>(keys);
+                        keys.add(null);
+                    }
+                    return Nodes.node(keys);
+                }
+            }
+
+            @Override
+            Sink<T> opWrapSink(int flags, Sink<T> sink) {
+                Objects.requireNonNull(sink);
+
+                if (StreamOpFlag.DISTINCT.isKnown(flags)) {
+                    return sink;
+                }
+                else if (StreamOpFlag.SORTED.isKnown(flags)) {
+                    return new Sink.ChainedReference<T>(sink) {
+                        boolean seenNull;
+                        T lastSeen;
+
+                        @Override
+                        public void begin(long size) {
+                            seenNull = false;
+                            lastSeen = null;
+                            downstream.begin(-1);
+                        }
+
+                        @Override
+                        public void end() {
+                            seenNull = false;
+                            lastSeen = null;
+                            downstream.end();
+                        }
+
+                        @Override
+                        public void accept(T t) {
+                            if (t == null) {
+                                if (!seenNull) {
+                                    seenNull = true;
+                                    downstream.accept(lastSeen = null);
+                                }
+                            } else if (lastSeen == null || !t.equals(lastSeen)) {
+                                downstream.accept(lastSeen = t);
+                            }
+                        }
+                    };
+                }
+                else {
+                    return new Sink.ChainedReference<T>(sink) {
+                        Set<T> seen;
+
+                        @Override
+                        public void begin(long size) {
+                            seen = new HashSet<>();
+                            downstream.begin(-1);
+                        }
+
+                        @Override
+                        public void end() {
+                            seen = null;
+                            downstream.end();
+                        }
+
+                        @Override
+                        public void accept(T t) {
+                            if (!seen.contains(t)) {
+                                seen.add(t);
+                                downstream.accept(t);
+                            }
+                        }
+                    };
+                }
+            }
+        };
+    }
+}
--- a/src/share/classes/java/util/stream/DoublePipeline.java	Wed Apr 03 11:29:23 2013 -0700
+++ b/src/share/classes/java/util/stream/DoublePipeline.java	Wed Apr 03 21:51:55 2013 +0200
@@ -48,15 +48,16 @@
  * @param <E_IN> Type of elements in the upstream source.
  * @since 1.8
  */
-class DoublePipeline<E_IN> extends AbstractPipeline<E_IN, Double, DoubleStream> implements DoubleStream {
+abstract class DoublePipeline<E_IN> extends AbstractPipeline<E_IN, Double, DoubleStream> implements DoubleStream {
 
     /**
      * Constructor for the head of a stream pipeline.
      *
      * @param source {@code Supplier<Spliterator>} describing the stream source
-     * @param sourceFlags The source flags for the stream source, described in {@link StreamOpFlag}
+     * @param sourceFlags The source flags for the stream source, described in
+     * {@link StreamOpFlag}
      */
-    public DoublePipeline(Supplier<? extends Spliterator<Double>> source, int sourceFlags) {
+    DoublePipeline(Supplier<? extends Spliterator<Double>> source, int sourceFlags) {
         super(source, sourceFlags);
     }
 
@@ -64,9 +65,10 @@
      * Constructor for the head of a stream pipeline.
      *
      * @param source {@code Spliterator} describing the stream source
-     * @param sourceFlags The source flags for the stream source, described in {@link StreamOpFlag}
+     * @param sourceFlags The source flags for the stream source, described in
+     * {@link StreamOpFlag}
      */
-    public DoublePipeline(Spliterator<Double> source, int sourceFlags) {
+    DoublePipeline(Spliterator<Double> source, int sourceFlags) {
         super(source, sourceFlags);
     }
 
@@ -74,44 +76,10 @@
      * Constructor for appending an intermediate operation onto an existing pipeline.
      *
      * @param upstream the upstream element source.
-     * @param op the operation performed upon elements.
+     * @param opFlags the operation flags
      */
-    public DoublePipeline(AbstractPipeline<?, E_IN, ?> upstream, IntermediateOp<E_IN, Double> op) {
-        super(upstream, op);
-    }
-
-    // Methods from AbstractPipeline
-
-    @Override
-    protected StreamShape getOutputShape() {
-        return StreamShape.DOUBLE_VALUE;
-    }
-
-    @Override
-    protected <P_IN> Node<Double> evaluateToNode(PipelineHelper<P_IN, Double> helper,
-                                                 Spliterator<P_IN> spliterator,
-                                                 boolean flattenTree,
-                                                 IntFunction<Double[]> generator) {
-        return NodeUtils.doubleCollect(helper, spliterator, flattenTree);
-    }
-
-    @Override
-    protected <P_IN> Spliterator<Double> wrap(PipelineHelper<P_IN, Double> ph,
-                                              Supplier<Spliterator<P_IN>> supplier,
-                                              boolean isParallel) {
-        return new StreamSpliterators.DoubleWrappingSpliterator<>(ph, supplier, isParallel);
-    }
-
-    @Override
-    protected Spliterator.OfDouble lazySpliterator(Supplier<? extends Spliterator<Double>> supplier) {
-        return new StreamSpliterators.DelegatingSpliterator.OfDouble((Supplier<Spliterator.OfDouble>) supplier);
-    }
-
-    @Override
-    protected void forEachWithCancel(Spliterator<Double> spliterator, Sink<Double> sink) {
-        Spliterator.OfDouble spl = adapt(spliterator);
-        DoubleConsumer adaptedSink = adapt(sink);
-        while (!sink.cancellationRequested() && spl.tryAdvance(adaptedSink)) { }
+    DoublePipeline(AbstractPipeline<?, E_IN, ?> upstream, int opFlags) {
+        super(upstream, opFlags);
     }
 
     private static DoubleConsumer adapt(Sink<Double> sink) {
@@ -125,22 +93,7 @@
         }
     }
 
-    @Override
-    protected Node.Builder<Double> makeNodeBuilder(long exactSizeIfKnown, IntFunction<Double[]> generator) {
-        return Nodes.doubleMakeBuilder(exactSizeIfKnown);
-    }
-
-    @Override
-    public Spliterator.OfDouble spliterator() {
-        return adapt(super.spliterator());
-    }
-
-    @Override
-    public Stream<Double> boxed() {
-        return mapToObj(Double::valueOf);
-    }
-
-    static Spliterator.OfDouble adapt(Spliterator<Double> s) {
+    private static Spliterator.OfDouble adapt(Spliterator<Double> s) {
         if (s instanceof Spliterator.OfDouble) {
             return (Spliterator.OfDouble) s;
         }
@@ -151,137 +104,228 @@
         }
     }
 
+
+    // Shape-specific methods
+
     @Override
-    public PrimitiveIterator.OfDouble iterator() {
+    final StreamShape getOutputShape() {
+        return StreamShape.DOUBLE_VALUE;
+    }
+
+    @Override
+    final <P_IN> Node<Double> evaluateToNode(PipelineHelper<Double> helper,
+                                             Spliterator<P_IN> spliterator,
+                                             boolean flattenTree,
+                                             IntFunction<Double[]> generator) {
+        return NodeUtils.doubleCollect(helper, spliterator, flattenTree);
+    }
+
+    @Override
+    final <P_IN> Spliterator<Double> wrap(PipelineHelper<Double> ph,
+                                          Supplier<Spliterator<P_IN>> supplier,
+                                          boolean isParallel) {
+        return new StreamSpliterators.DoubleWrappingSpliterator<>(ph, supplier, isParallel);
+    }
+
+    @Override
+    final Spliterator.OfDouble lazySpliterator(Supplier<? extends Spliterator<Double>> supplier) {
+        return new StreamSpliterators.DelegatingSpliterator.OfDouble((Supplier<Spliterator.OfDouble>) supplier);
+    }
+
+    @Override
+    final void forEachWithCancel(Spliterator<Double> spliterator, Sink<Double> sink) {
+        Spliterator.OfDouble spl = adapt(spliterator);
+        DoubleConsumer adaptedSink = adapt(sink);
+        while (!sink.cancellationRequested() && spl.tryAdvance(adaptedSink)) { }
+    }
+
+    @Override
+    final  Node.Builder<Double> makeNodeBuilder(long exactSizeIfKnown, IntFunction<Double[]> generator) {
+        return Nodes.doubleMakeBuilder(exactSizeIfKnown);
+    }
+
+
+    // DoubleStream
+
+    @Override
+    public final PrimitiveIterator.OfDouble iterator() {
         return Spliterators.iteratorFromSpliterator(spliterator());
     }
 
+    @Override
+    public final Spliterator.OfDouble spliterator() {
+        return adapt(super.spliterator());
+    }
+
     // Stateless intermediate ops from DoubleStream
 
     @Override
-    public DoubleStream map(DoubleUnaryOperator mapper) {
-        Objects.requireNonNull(mapper);
-        return chainedToDouble(StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT, StreamShape.DOUBLE_VALUE,
-                               (flags, sink) -> new Sink.ChainedDouble(sink) {
-                                   @Override
-                                   public void accept(double t) {
-                                       downstream.accept(mapper.applyAsDouble(t));
-                                   }
-                               });
+    public final Stream<Double> boxed() {
+        return mapToObj(Double::valueOf);
     }
 
     @Override
-    public <U> Stream<U> mapToObj(DoubleFunction<U> mapper) {
+    public final DoubleStream map(DoubleUnaryOperator mapper) {
         Objects.requireNonNull(mapper);
-        return chainedToRef(StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT, StreamShape.DOUBLE_VALUE,
-                            (flags, sink) -> new Sink.ChainedDouble(sink) {
-                                @Override
-                                public void accept(double t) {
-                                    downstream.accept(mapper.apply(t));
-                                }
-                            });
+        return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE,
+                                       StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
+            @Override
+            Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
+                return new Sink.ChainedDouble(sink) {
+                    @Override
+                    public void accept(double t) {
+                        downstream.accept(mapper.applyAsDouble(t));
+                    }
+                };
+            }
+        };
     }
 
     @Override
-    public IntStream mapToInt(DoubleToIntFunction mapper) {
+    public final <U> Stream<U> mapToObj(DoubleFunction<U> mapper) {
         Objects.requireNonNull(mapper);
-        return chainedToInt(StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT, StreamShape.DOUBLE_VALUE,
-                            (flags, sink) -> new Sink.ChainedDouble(sink) {
-                                @Override
-                                public void accept(double t) {
-                                    downstream.accept(mapper.applyAsInt(t));
-                                }
-                            });
+        return new ReferencePipeline.StatelessOp<Double, U>(this, StreamShape.DOUBLE_VALUE,
+                                                            StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
+            @Override
+            Sink<Double> opWrapSink(int flags, Sink<U> sink) {
+                return new Sink.ChainedDouble(sink) {
+                    @Override
+                    public void accept(double t) {
+                        downstream.accept(mapper.apply(t));
+                    }
+                };
+            }
+        };
     }
 
     @Override
-    public LongStream mapToLong(DoubleToLongFunction mapper) {
+    public final IntStream mapToInt(DoubleToIntFunction mapper) {
         Objects.requireNonNull(mapper);
-        return chainedToLong(StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT, StreamShape.DOUBLE_VALUE,
-                             (flags, sink) -> new Sink.ChainedDouble(sink) {
-                                 @Override
-                                 public void accept(double t) {
-                                     downstream.accept(mapper.applyAsLong(t));
-                                 }
-                             });
+        return new IntPipeline.StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE,
+                                                   StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
+            @Override
+            Sink<Double> opWrapSink(int flags, Sink<Integer> sink) {
+                return new Sink.ChainedDouble(sink) {
+                    @Override
+                    public void accept(double t) {
+                        downstream.accept(mapper.applyAsInt(t));
+                    }
+                };
+            }
+        };
     }
 
     @Override
-    public DoubleStream flatMap(DoubleFunction<? extends DoubleStream> mapper) {
+    public final LongStream mapToLong(DoubleToLongFunction mapper) {
+        Objects.requireNonNull(mapper);
+        return new LongPipeline.StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE,
+                                                    StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
+            @Override
+            Sink<Double> opWrapSink(int flags, Sink<Long> sink) {
+                return new Sink.ChainedDouble(sink) {
+                    @Override
+                    public void accept(double t) {
+                        downstream.accept(mapper.applyAsLong(t));
+                    }
+                };
+            }
+        };
+    }
+
+    @Override
+    public final DoubleStream flatMap(DoubleFunction<? extends DoubleStream> mapper) {
         return flatMap((double i, DoubleConsumer sink) -> mapper.apply(i).sequential().forEach(sink));
     }
 
     @Override
-    public DoubleStream flatMap(FlatMapper.OfDoubleToDouble mapper) {
+    public final DoubleStream flatMap(FlatMapper.OfDoubleToDouble mapper) {
         Objects.requireNonNull(mapper);
-        return chainedToDouble(StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED,
-                               StreamShape.DOUBLE_VALUE,
-                               (flags, sink) -> new Sink.ChainedDouble(sink) {
-                                   public void accept(double t) {
-                                       mapper.flattenInto(t, (Sink.OfDouble) downstream);
-                                   }
-                               });
+        return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE,
+                                       StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
+            @Override
+            Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
+                return new Sink.ChainedDouble(sink) {
+                    public void accept(double t) {
+                        mapper.flattenInto(t, (Sink.OfDouble) downstream);
+                    }
+                };
+            }
+        };
     }
 
     @Override
-    public DoubleStream filter(DoublePredicate predicate) {
+    public final DoubleStream filter(DoublePredicate predicate) {
         Objects.requireNonNull(predicate);
-        return chainedToDouble(StreamOpFlag.NOT_SIZED, StreamShape.DOUBLE_VALUE,
-                               (flags, sink) -> new Sink.ChainedDouble(sink) {
-                                   @Override
-                                   public void accept(double t) {
-                                       if (predicate.test(t))
-                                           downstream.accept(t);
-                                   }
-                               });
+        return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE,
+                                       StreamOpFlag.NOT_SIZED) {
+            @Override
+            Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
+                return new Sink.ChainedDouble(sink) {
+                    @Override
+                    public void accept(double t) {
+                        if (predicate.test(t))
+                            downstream.accept(t);
+                    }
+                };
+            }
+        };
     }
 
     @Override
-    public DoubleStream peek(DoubleConsumer consumer) {
+    public final DoubleStream peek(DoubleConsumer consumer) {
         Objects.requireNonNull(consumer);
-        return chainedToDouble(0, StreamShape.DOUBLE_VALUE,
-                               (flags, sink) -> new Sink.ChainedDouble(sink) {
-                                   @Override
-                                   public void accept(double t) {
-                                       consumer.accept(t);
-                                       downstream.accept(t);
-                                   }
-                               });
+        return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE,
+                                       0) {
+            @Override
+            Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
+                return new Sink.ChainedDouble(sink) {
+                    @Override
+                    public void accept(double t) {
+                        consumer.accept(t);
+                        downstream.accept(t);
+                    }
+                };
+            }
+        };
     }
 
     // Stateful intermediate ops from DoubleStream
 
-    @Override
-    public DoubleStream limit(long maxSize) {
-        if (maxSize < 0)
-            throw new IllegalArgumentException(Long.toString(maxSize));
-        return super.slice(0, maxSize);
+    private DoubleStream slice(long skip, long limit) {
+        return SliceOps.makeDouble(this, skip, limit);
     }
 
     @Override
-    public DoubleStream substream(long startingOffset) {
+    public final DoubleStream limit(long maxSize) {
+        if (maxSize < 0)
+            throw new IllegalArgumentException(Long.toString(maxSize));
+        return slice(0, maxSize);
+    }
+
+    @Override
+    public final DoubleStream substream(long startingOffset) {
         if (startingOffset < 0)
             throw new IllegalArgumentException(Long.toString(startingOffset));
         if (startingOffset == 0)
             return this;
         else
-            return super.slice(startingOffset, -1);
+            return slice(startingOffset, -1);
     }
 
     @Override
-    public DoubleStream substream(long startingOffset, long endingOffset) {
+    public final DoubleStream substream(long startingOffset, long endingOffset) {
         if (startingOffset < 0 || endingOffset < startingOffset)
             throw new IllegalArgumentException(String.format("substream(%d, %d)", startingOffset, endingOffset));
-        return super.slice(startingOffset, endingOffset - startingOffset);
+        return slice(startingOffset, endingOffset - startingOffset);
     }
 
     @Override
-    public DoubleStream sorted() {
-        return pipeline(new SortedOp.OfDouble());
+    public final DoubleStream sorted() {
+        return SortedOps.makeDouble(this);
     }
 
     @Override
-    public DoubleStream distinct() {
+    public final DoubleStream distinct() {
         // @@@ While functional and quick to implement this approach is not very efficient.
         //     An efficient version requires an double-specific map/set implementation.
         return boxed().distinct().mapToDouble(i -> (double) i);
@@ -290,33 +334,33 @@
     // Terminal ops from DoubleStream
 
     @Override
-    public void forEach(DoubleConsumer consumer) {
+    public final void forEach(DoubleConsumer consumer) {
         evaluate(ForEachOps.makeDouble(consumer, false));
     }
 
     @Override
-    public void forEachOrdered(DoubleConsumer consumer) {
+    public final void forEachOrdered(DoubleConsumer consumer) {
         evaluate(ForEachOps.makeDouble(consumer, true));
     }
 
     @Override
-    public double sum() {
+    public final double sum() {
         // better algorithm to compensate for errors ?
         return reduce(0.0, Double::sum);
     }
 
     @Override
-    public OptionalDouble min() {
+    public final OptionalDouble min() {
         return reduce(Math::min);
     }
 
     @Override
-    public OptionalDouble max() {
+    public final OptionalDouble max() {
         return reduce(Math::max);
     }
 
     @Override
-    public OptionalDouble average() {
+    public final OptionalDouble average() {
         double[] avg = collect(() -> new double[2],
                                (ll, i) -> {
                                    ll[0]++;
@@ -330,27 +374,27 @@
     }
 
     @Override
-    public long count() {
+    public final long count() {
         return mapToObj(e -> null).mapToInt(e -> 1).sum();
     }
 
     @Override
-    public DoubleSummaryStatistics summaryStatistics() {
+    public final DoubleSummaryStatistics summaryStatistics() {
         return collect(DoubleSummaryStatistics::new, DoubleSummaryStatistics::accept, DoubleSummaryStatistics::combine);
     }
 
     @Override
-    public double reduce(double identity, DoubleBinaryOperator op) {
+    public final double reduce(double identity, DoubleBinaryOperator op) {
         return evaluate(ReduceOps.makeDouble(identity, op));
     }
 
     @Override
-    public OptionalDouble reduce(DoubleBinaryOperator op) {
+    public final OptionalDouble reduce(DoubleBinaryOperator op) {
         return evaluate(ReduceOps.makeDouble(op));
     }
 
     @Override
-    public <R> R collect(Supplier<R> resultFactory, ObjDoubleConsumer<R> accumulator, BiConsumer<R, R> combiner) {
+    public final <R> R collect(Supplier<R> resultFactory, ObjDoubleConsumer<R> accumulator, BiConsumer<R, R> combiner) {
         BinaryOperator<R> operator = (left, right) -> {
             combiner.accept(left, right);
             return left;
@@ -359,32 +403,88 @@
     }
 
     @Override
-    public boolean anyMatch(DoublePredicate predicate) {
+    public final boolean anyMatch(DoublePredicate predicate) {
         return evaluate(MatchOps.makeDouble(predicate, MatchOps.MatchKind.ANY));
     }
 
     @Override
-    public boolean allMatch(DoublePredicate predicate) {
+    public final boolean allMatch(DoublePredicate predicate) {
         return evaluate(MatchOps.makeDouble(predicate, MatchOps.MatchKind.ALL));
     }
 
     @Override
-    public boolean noneMatch(DoublePredicate predicate) {
+    public final boolean noneMatch(DoublePredicate predicate) {
         return evaluate(MatchOps.makeDouble(predicate, MatchOps.MatchKind.NONE));
     }
 
     @Override
-    public OptionalDouble findFirst() {
+    public final OptionalDouble findFirst() {
         return evaluate(FindOps.makeDouble(true));
     }
 
     @Override
-    public OptionalDouble findAny() {
+    public final OptionalDouble findAny() {
         return evaluate(FindOps.makeDouble(false));
     }
 
     @Override
-    public double[] toArray() {
+    public final double[] toArray() {
         return ((Node.OfDouble) evaluateToArrayNode(Double[]::new)).asDoubleArray();
     }
+
+
+    //
+
+    static class Head<E_IN> extends DoublePipeline<E_IN> {
+        Head(Supplier<? extends Spliterator<Double>> source, int sourceFlags) {
+            super(source, sourceFlags);
+        }
+
+        Head(Spliterator<Double> source, int sourceFlags) {
+            super(source, sourceFlags);
+        }
+
+        @Override
+        final boolean opIsStateful() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        final Sink<E_IN> opWrapSink(int flags, Sink<Double> sink) {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    abstract static class StatelessOp<E_IN> extends DoublePipeline<E_IN> {
+        StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
+                    StreamShape inputShape,
+                    int opFlags) {
+            super(upstream, opFlags);
+            assert upstream.getOutputShape() == inputShape;
+        }
+
+        @Override
+        final boolean opIsStateful() {
+            return false;
+        }
+    }
+
+    abstract static class StatefulOp<E_IN> extends DoublePipeline<E_IN> {
+        StatefulOp(AbstractPipeline<?, E_IN, ?> upstream,
+                   StreamShape inputShape,
+                   int opFlags) {
+            super(upstream, opFlags);
+            assert upstream.getOutputShape() == inputShape;
+        }
+
+        @Override
+        final boolean opIsStateful() {
+            return true;
+        }
+
+        @Override
+        abstract <P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper,
+                                                        Spliterator<P_IN> spliterator,
+                                                        IntFunction<Double[]> generator);
+    }
 }
--- a/src/share/classes/java/util/stream/FindOps.java	Wed Apr 03 11:29:23 2013 -0700
+++ b/src/share/classes/java/util/stream/FindOps.java	Wed Apr 03 21:51:55 2013 +0200
@@ -143,13 +143,13 @@
         }
 
         @Override
-        public <S> O evaluateSequential(PipelineHelper<S, T> helper, Spliterator<S> spliterator) {
+        public <S> O evaluateSequential(PipelineHelper<T> helper, Spliterator<S> spliterator) {
             O result = helper.wrapAndCopyInto(sinkSupplier.get(), spliterator).get();
             return result != null ? result : emptyValue;
         }
 
         @Override
-        public <P_IN> O evaluateParallel(PipelineHelper<P_IN, T> helper, Spliterator<P_IN> spliterator) {
+        public <P_IN> O evaluateParallel(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {
             return new FindTask<>(this, helper, spliterator).invoke();
         }
     }
@@ -240,7 +240,7 @@
     private static final class FindTask<S, T, O> extends AbstractShortCircuitTask<S, T, O, FindTask<S, T, O>> {
         private final FindOp<T, O> op;
 
-        FindTask(FindOp<T, O> op, PipelineHelper<S, T> helper, Spliterator<S> spliterator) {
+        FindTask(FindOp<T, O> op, PipelineHelper<T> helper, Spliterator<S> spliterator) {
             super(helper, spliterator);
             this.op = op;
         }
--- a/src/share/classes/java/util/stream/ForEachOps.java	Wed Apr 03 11:29:23 2013 -0700
+++ b/src/share/classes/java/util/stream/ForEachOps.java	Wed Apr 03 21:51:55 2013 +0200
@@ -143,12 +143,12 @@
         }
 
         @Override
-        public <S> Void evaluateSequential(PipelineHelper<S, T> helper, Spliterator<S> spliterator) {
+        public <S> Void evaluateSequential(PipelineHelper<T> helper, Spliterator<S> spliterator) {
             return helper.wrapAndCopyInto(this, spliterator).get();
         }
 
         @Override
-        public <S> Void evaluateParallel(PipelineHelper<S, T> helper, Spliterator<S> spliterator) {
+        public <S> Void evaluateParallel(PipelineHelper<T> helper, Spliterator<S> spliterator) {
             if (ordered)
                 new ForEachOrderedTask<>(helper, spliterator, this).invoke();
             else
@@ -245,10 +245,10 @@
     private static class ForEachTask<S, T> extends CountedCompleter<Void> {
         private Spliterator<S> spliterator;
         private final Sink<S> sink;
-        private final PipelineHelper<S, T> helper;
+        private final PipelineHelper<T> helper;
         private final long targetSize;
 
-        ForEachTask(PipelineHelper<S, T> helper, Spliterator<S> spliterator, Sink<S> sink) {
+        ForEachTask(PipelineHelper<T> helper, Spliterator<S> spliterator, Sink<S> sink) {
             super(null);
             this.spliterator = spliterator;
             this.sink = sink;
@@ -291,7 +291,7 @@
 
     /** A {@code ForkJoinTask} for performing a parallel ordered for-each operation */
     private static class ForEachOrderedTask<S, T> extends CountedCompleter<Void> {
-        private final PipelineHelper<S, T> helper;
+        private final PipelineHelper<T> helper;
         private Spliterator<S> spliterator;
         private final long targetSize;
         private final ConcurrentHashMap<ForEachOrderedTask<S, T>, ForEachOrderedTask<S, T>> completionMap;
@@ -300,7 +300,7 @@
         private final ForEachOrderedTask<S, T> leftPredecessor;
         private Node<T> node;
 
-        protected ForEachOrderedTask(PipelineHelper<S, T> helper, Spliterator<S> spliterator, Sink<T> action) {
+        protected ForEachOrderedTask(PipelineHelper<T> helper, Spliterator<S> spliterator, Sink<T> action) {
             super(null);
             this.helper = helper;
             this.spliterator = spliterator;
--- a/src/share/classes/java/util/stream/IntPipeline.java	Wed Apr 03 11:29:23 2013 -0700
+++ b/src/share/classes/java/util/stream/IntPipeline.java	Wed Apr 03 21:51:55 2013 +0200
@@ -48,15 +48,16 @@
  * @param <E_IN> Type of elements in the upstream source.
  * @since 1.8
  */
-class IntPipeline<E_IN> extends AbstractPipeline<E_IN, Integer, IntStream> implements IntStream {
+abstract class IntPipeline<E_IN> extends AbstractPipeline<E_IN, Integer, IntStream> implements IntStream {
 
     /**
      * Constructor for the head of a stream pipeline.
      *
      * @param source {@code Supplier<Spliterator>} describing the stream source
-     * @param sourceFlags The source flags for the stream source, described in {@link StreamOpFlag}
+     * @param sourceFlags The source flags for the stream source, described in
+     * {@link StreamOpFlag}
      */
-    public IntPipeline(Supplier<? extends Spliterator<Integer>> source, int sourceFlags) {
+    IntPipeline(Supplier<? extends Spliterator<Integer>> source, int sourceFlags) {
         super(source, sourceFlags);
     }
 
@@ -64,54 +65,22 @@
      * Constructor for the head of a stream pipeline.
      *
      * @param source {@code Spliterator} describing the stream source
-     * @param sourceFlags The source flags for the stream source, described in {@link StreamOpFlag}
+     * @param sourceFlags The source flags for the stream source, described in
+     * {@link StreamOpFlag}
      */
-    public IntPipeline(Spliterator<Integer> source, int sourceFlags) {
+    IntPipeline(Spliterator<Integer> source, int sourceFlags) {
         super(source, sourceFlags);
     }
 
     /**
-     * Constructor for appending an intermediate operation onto an existing pipeline.
+     * Constructor for appending an intermediate operation onto an existing
+     * pipeline.
      *
-     * @param upstream the upstream element source.
-     * @param op the operation performed upon elements.
+     * @param upstream the upstream element source
+     * @param opFlags the operation flags for the new operation
      */
-    public IntPipeline(AbstractPipeline<?, E_IN, ?> upstream, IntermediateOp<E_IN, Integer> op) {
-        super(upstream, op);
-    }
-
-    // Methods from AbstractPipeline
-
-    @Override
-    protected StreamShape getOutputShape() {
-        return StreamShape.INT_VALUE;
-    }
-
-    @Override
-    protected <P_IN> Node<Integer> evaluateToNode(PipelineHelper<P_IN, Integer> helper,
-                                                  Spliterator<P_IN> spliterator,
-                                                  boolean flattenTree,
-                                                  IntFunction<Integer[]> generator) {
-        return NodeUtils.intCollect(helper, spliterator, flattenTree);
-    }
-
-    @Override
-    protected <P_IN> Spliterator<Integer> wrap(PipelineHelper<P_IN, Integer> ph,
-                                               Supplier<Spliterator<P_IN>> supplier,
-                                               boolean isParallel) {
-        return new StreamSpliterators.IntWrappingSpliterator<>(ph, supplier, isParallel);
-    }
-
-    @Override
-    protected Spliterator.OfInt lazySpliterator(Supplier<? extends Spliterator<Integer>> supplier) {
-        return new StreamSpliterators.DelegatingSpliterator.OfInt((Supplier<Spliterator.OfInt>) supplier);
-    }
-
-    @Override
-    protected void forEachWithCancel(Spliterator<Integer> spliterator, Sink<Integer> sink) {
-        Spliterator.OfInt spl = adapt(spliterator);
-        IntConsumer adaptedSink = adapt(sink);
-        while (!sink.cancellationRequested() && spl.tryAdvance(adaptedSink)) { }
+    IntPipeline(AbstractPipeline<?, E_IN, ?> upstream, int opFlags) {
+        super(upstream, opFlags);
     }
 
     private static IntConsumer adapt(Sink<Integer> sink) {
@@ -125,16 +94,6 @@
         }
     }
 
-    @Override
-    protected Node.Builder<Integer> makeNodeBuilder(long exactSizeIfKnown, IntFunction<Integer[]> generator) {
-        return Nodes.intMakeBuilder(exactSizeIfKnown);
-    }
-
-    @Override
-    public Spliterator.OfInt spliterator() {
-        return adapt(super.spliterator());
-    }
-
     private static Spliterator.OfInt adapt(Spliterator<Integer> s) {
         if (s instanceof Spliterator.OfInt) {
             return (Spliterator.OfInt) s;
@@ -146,164 +105,260 @@
         }
     }
 
+
+    // Shape-specific methods
+
     @Override
-    public PrimitiveIterator.OfInt iterator() {
+    final StreamShape getOutputShape() {
+        return StreamShape.INT_VALUE;
+    }
+
+    @Override
+    final <P_IN> Node<Integer> evaluateToNode(PipelineHelper<Integer> helper,
+                                              Spliterator<P_IN> spliterator,
+                                              boolean flattenTree,
+                                              IntFunction<Integer[]> generator) {
+        return NodeUtils.intCollect(helper, spliterator, flattenTree);
+    }
+
+    @Override
+    final <P_IN> Spliterator<Integer> wrap(PipelineHelper<Integer> ph,
+                                           Supplier<Spliterator<P_IN>> supplier,
+                                           boolean isParallel) {
+        return new StreamSpliterators.IntWrappingSpliterator<>(ph, supplier, isParallel);
+    }
+
+    @Override
+    final Spliterator.OfInt lazySpliterator(Supplier<? extends Spliterator<Integer>> supplier) {
+        return new StreamSpliterators.DelegatingSpliterator.OfInt((Supplier<Spliterator.OfInt>) supplier);
+    }
+
+    @Override
+    final void forEachWithCancel(Spliterator<Integer> spliterator, Sink<Integer> sink) {
+        Spliterator.OfInt spl = adapt(spliterator);
+        IntConsumer adaptedSink = adapt(sink);
+        while (!sink.cancellationRequested() && spl.tryAdvance(adaptedSink)) { }
+    }
+
+    @Override
+    final Node.Builder<Integer> makeNodeBuilder(long exactSizeIfKnown, IntFunction<Integer[]> generator) {
+        return Nodes.intMakeBuilder(exactSizeIfKnown);
+    }
+
+
+    // IntStream
+
+    @Override
+    public final PrimitiveIterator.OfInt iterator() {
         return Spliterators.iteratorFromSpliterator(spliterator());
     }
 
+    @Override
+    public final Spliterator.OfInt spliterator() {
+        return adapt(super.spliterator());
+    }
+
     // Stateless intermediate ops from IntStream
 
     @Override
-    public LongStream longs() {
-        return chainedToLong(StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT, StreamShape.INT_VALUE,
-                             (flags, sink) -> new Sink.ChainedInt(sink) {
-                                 @Override
-                                 public void accept(int t) {
-                                     downstream.accept((long) t);
-                                 }
-                             });
+    public final LongStream longs() {
+        return new LongPipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE,
+                                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
+            @Override
+            Sink<Integer> opWrapSink(int flags, Sink<Long> sink) {
+                return new Sink.ChainedInt(sink) {
+                    @Override
+                    public void accept(int t) {
+                        downstream.accept((long) t);
+                    }
+                };
+            }
+        };
     }
 
     @Override
-    public DoubleStream doubles() {
-        return chainedToDouble(StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT, StreamShape.INT_VALUE,
-                               (flags, sink) -> new Sink.ChainedInt(sink) {
-                                   @Override
-                                   public void accept(int t) {
-                                       downstream.accept((double) t);
-                                   }
-                               });
+    public final DoubleStream doubles() {
+        return new DoublePipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE,
+                                                       StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
+            @Override
+            Sink<Integer> opWrapSink(int flags, Sink<Double> sink) {
+                return new Sink.ChainedInt(sink) {
+                    @Override
+                    public void accept(int t) {
+                        downstream.accept((double) t);
+                    }
+                };
+            }
+        };
     }
 
     @Override
-    public Stream<Integer> boxed() {
+    public final Stream<Integer> boxed() {
         return mapToObj(Integer::valueOf);
     }
 
     @Override
-    public IntStream map(IntUnaryOperator mapper) {
+    public final IntStream map(IntUnaryOperator mapper) {
         Objects.requireNonNull(mapper);
-        return chainedToInt(StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT, StreamShape.INT_VALUE,
-                            (flags, sink) -> new Sink.ChainedInt(sink) {
-                                @Override
-                                public void accept(int t) {
-                                    downstream.accept(mapper.applyAsInt(t));
-                                }
-                            });
+        return new StatelessOp<Integer>(this, StreamShape.INT_VALUE,
+                                        StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
+            @Override
+            Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
+                return new Sink.ChainedInt(sink) {
+                    @Override
+                    public void accept(int t) {
+                        downstream.accept(mapper.applyAsInt(t));
+                    }
+                };
+            }
+        };
     }
 
     @Override
-    public <U> Stream<U> mapToObj(IntFunction<U> mapper) {
+    public final <U> Stream<U> mapToObj(IntFunction<U> mapper) {
         Objects.requireNonNull(mapper);
-        return chainedToRef(StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT, StreamShape.INT_VALUE,
-                            (flags, sink) -> new Sink.ChainedInt(sink) {
-                                @Override
-                                public void accept(int t) {
-                                    downstream.accept(mapper.apply(t));
-                                }
-                            });
+        return new ReferencePipeline.StatelessOp<Integer, U>(this, StreamShape.INT_VALUE,
+                                                             StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
+            @Override
+            Sink<Integer> opWrapSink(int flags, Sink<U> sink) {
+                return new Sink.ChainedInt(sink) {
+                    @Override
+                    public void accept(int t) {
+                        downstream.accept(mapper.apply(t));
+                    }
+                };
+            }
+        };
     }
 
     @Override
-    public LongStream mapToLong(IntToLongFunction mapper) {
+    public final LongStream mapToLong(IntToLongFunction mapper) {
         Objects.requireNonNull(mapper);
-        return chainedToLong(StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT, StreamShape.INT_VALUE,
-                             (flags, sink) -> new Sink.ChainedInt(sink) {
-                                 @Override
-                                 public void accept(int t) {
-                                     downstream.accept(mapper.applyAsLong(t));
-                                 }
-                             });
+        return new LongPipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE,
+                                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
+            @Override
+            Sink<Integer> opWrapSink(int flags, Sink<Long> sink) {
+                return new Sink.ChainedInt(sink) {
+                    @Override
+                    public void accept(int t) {
+                        downstream.accept(mapper.applyAsLong(t));
+                    }
+                };
+            }
+        };
     }
 
     @Override
-    public DoubleStream mapToDouble(IntToDoubleFunction mapper) {
+    public final DoubleStream mapToDouble(IntToDoubleFunction mapper) {
         Objects.requireNonNull(mapper);
-        return chainedToDouble(StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT, StreamShape.INT_VALUE,
-                               (flags, sink) -> new Sink.ChainedInt(sink) {
-                                   @Override
-                                   public void accept(int t) {
-                                       downstream.accept(mapper.applyAsDouble(t));
-                                   }
-                               });
+        return new DoublePipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE,
+                                                       StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
+            @Override
+            Sink<Integer> opWrapSink(int flags, Sink<Double> sink) {
+                return new Sink.ChainedInt(sink) {
+                    @Override
+                    public void accept(int t) {
+                        downstream.accept(mapper.applyAsDouble(t));
+                    }
+                };
+            }
+        };
     }
 
     @Override
-    public IntStream flatMap(IntFunction<? extends IntStream> mapper) {
+    public final IntStream flatMap(IntFunction<? extends IntStream> mapper) {
         return flatMap((int i, IntConsumer sink) -> mapper.apply(i).sequential().forEach(sink));
     }
 
     @Override
-    public IntStream flatMap(FlatMapper.OfIntToInt mapper) {
+    public final IntStream flatMap(FlatMapper.OfIntToInt mapper) {
         Objects.requireNonNull(mapper);
-        return chainedToInt(StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED,
-                            StreamShape.INT_VALUE,
-                            (flags, sink) -> new Sink.ChainedInt(sink) {
-                                public void accept(int t) {
-                                    mapper.flattenInto(t, (Sink.OfInt) downstream);
-                                }
-                            });
+        return new StatelessOp<Integer>(this, StreamShape.INT_VALUE,
+                                        StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
+            @Override
+            Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
+                return new Sink.ChainedInt(sink) {
+                    public void accept(int t) {
+                        mapper.flattenInto(t, (Sink.OfInt) downstream);
+                    }
+                };
+            }
+        };
     }
 
     @Override
-    public IntStream filter(IntPredicate predicate) {
+    public final IntStream filter(IntPredicate predicate) {
         Objects.requireNonNull(predicate);
-        return chainedToInt(StreamOpFlag.NOT_SIZED, StreamShape.INT_VALUE,
-                            (flags, sink) -> new Sink.ChainedInt(sink) {
-                                @Override
-                                public void accept(int t) {
-                                    if (predicate.test(t))
-                                        downstream.accept(t);
-                                }
-                            });
+        return new StatelessOp<Integer>(this, StreamShape.INT_VALUE,
+                                        StreamOpFlag.NOT_SIZED) {
+            @Override
+            Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
+                return new Sink.ChainedInt(sink) {
+                    @Override
+                    public void accept(int t) {
+                        if (predicate.test(t))
+                            downstream.accept(t);
+                    }
+                };
+            }
+        };
     }
 
     @Override
-    public IntStream peek(IntConsumer consumer) {
+    public final IntStream peek(IntConsumer consumer) {
         Objects.requireNonNull(consumer);
-        return chainedToInt(0, StreamShape.INT_VALUE,
-                            (flags, sink) -> new Sink.ChainedInt(sink) {
-                                @Override
-                                public void accept(int t) {
-                                    consumer.accept(t);
-                                    downstream.accept(t);
-                                }
-                            });
+        return new StatelessOp<Integer>(this, StreamShape.INT_VALUE,
+                                        0) {
+            @Override
+            Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
+                return new Sink.ChainedInt(sink) {
+                    @Override
+                    public void accept(int t) {
+                        consumer.accept(t);
+                        downstream.accept(t);
+                    }
+                };
+            }
+        };
     }
 
     // Stateful intermediate ops from IntStream
 
-    @Override
-    public IntStream limit(long maxSize) {
-        if (maxSize < 0)
-            throw new IllegalArgumentException(Long.toString(maxSize));
-        return super.slice(0, maxSize);
+    private IntStream slice(long skip, long limit) {
+        return SliceOps.makeInt(this, skip, limit);
     }
 
     @Override
-    public IntStream substream(long startingOffset) {
+    public final IntStream limit(long maxSize) {
+        if (maxSize < 0)
+            throw new IllegalArgumentException(Long.toString(maxSize));
+        return slice(0, maxSize);
+    }
+
+    @Override
+    public final IntStream substream(long startingOffset) {
         if (startingOffset < 0)
             throw new IllegalArgumentException(Long.toString(startingOffset));
         if (startingOffset == 0)
             return this;
         else
-            return super.slice(startingOffset, -1);
+            return slice(startingOffset, -1);
     }
 
     @Override
-    public IntStream substream(long startingOffset, long endingOffset) {
+    public final IntStream substream(long startingOffset, long endingOffset) {
         if (startingOffset < 0 || endingOffset < startingOffset)
             throw new IllegalArgumentException(String.format("substream(%d, %d)", startingOffset, endingOffset));
-        return super.slice(startingOffset, endingOffset - startingOffset);
+        return slice(startingOffset, endingOffset - startingOffset);
     }
 
     @Override
-    public IntStream sorted() {
-        return pipeline(new SortedOp.OfInt());
+    public final IntStream sorted() {
+        return SortedOps.makeInt(this);
     }
 
     @Override
-    public IntStream distinct() {
+    public final IntStream distinct() {
         // @@@ While functional and quick to implement this approach is not very efficient.
         //     An efficient version requires an int-specific map/set implementation.
         return boxed().distinct().mapToInt(i -> (int) i);
@@ -312,37 +367,37 @@
     // Terminal ops from IntStream
 
     @Override
-    public void forEach(IntConsumer consumer) {
+    public final void forEach(IntConsumer consumer) {
         evaluate(ForEachOps.makeInt(consumer, false));
     }
 
     @Override
-    public void forEachOrdered(IntConsumer consumer) {
+    public final void forEachOrdered(IntConsumer consumer) {
         evaluate(ForEachOps.makeInt(consumer, true));
     }
 
     @Override
-    public int sum() {
+    public final int sum() {
         return reduce(0, Integer::sum);
     }
 
     @Override
-    public OptionalInt min() {
+    public final OptionalInt min() {
         return reduce(Math::min);
     }
 
     @Override
-    public OptionalInt max() {
+    public final OptionalInt max() {
         return reduce(Math::max);
     }
 
     @Override
-    public long count() {
+    public final long count() {
         return longs().map(e -> 1L).sum();
     }
 
     @Override
-    public OptionalDouble average() {
+    public final OptionalDouble average() {
         long[] avg = collect(() -> new long[2],
                              (ll, i) -> {
                                  ll[0]++;
@@ -356,22 +411,22 @@
     }
 
     @Override
-    public IntSummaryStatistics summaryStatistics() {
+    public final IntSummaryStatistics summaryStatistics() {
         return collect(IntSummaryStatistics::new, IntSummaryStatistics::accept, IntSummaryStatistics::combine);
     }
 
     @Override
-    public int reduce(int identity, IntBinaryOperator op) {
+    public final int reduce(int identity, IntBinaryOperator op) {
         return evaluate(ReduceOps.makeInt(identity, op));
     }
 
     @Override
-    public OptionalInt reduce(IntBinaryOperator op) {
+    public final OptionalInt reduce(IntBinaryOperator op) {
         return evaluate(ReduceOps.makeInt(op));
     }
 
     @Override
-    public <R> R collect(Supplier<R> resultFactory, ObjIntConsumer<R> accumulator, BiConsumer<R, R> combiner) {
+    public final <R> R collect(Supplier<R> resultFactory, ObjIntConsumer<R> accumulator, BiConsumer<R, R> combiner) {
         BinaryOperator<R> operator = (left, right) -> {
             combiner.accept(left, right);
             return left;
@@ -380,32 +435,88 @@
     }
 
     @Override
-    public boolean anyMatch(IntPredicate predicate) {
+    public final boolean anyMatch(IntPredicate predicate) {
         return evaluate(MatchOps.makeInt(predicate, MatchOps.MatchKind.ANY));
     }
 
     @Override
-    public boolean allMatch(IntPredicate predicate) {
+    public final boolean allMatch(IntPredicate predicate) {
         return evaluate(MatchOps.makeInt(predicate, MatchOps.MatchKind.ALL));
     }
 
     @Override
-    public boolean noneMatch(IntPredicate predicate) {
+    public final boolean noneMatch(IntPredicate predicate) {
         return evaluate(MatchOps.makeInt(predicate, MatchOps.MatchKind.NONE));
     }
 
     @Override
-    public OptionalInt findFirst() {
+    public final OptionalInt findFirst() {
         return evaluate(FindOps.makeInt(true));
     }
 
     @Override
-    public OptionalInt findAny() {
+    public final OptionalInt findAny() {
         return evaluate(FindOps.makeInt(false));
     }
 
     @Override
-    public int[] toArray() {
+    public final int[] toArray() {
         return ((Node.OfInt) evaluateToArrayNode(Integer[]::new)).asIntArray();
     }
+
+
+    //
+
+    static class Head<E_IN> extends IntPipeline<E_IN> {
+        Head(Supplier<? extends Spliterator<Integer>> source, int sourceFlags) {
+            super(source, sourceFlags);
+        }
+
+        Head(Spliterator<Integer> source, int sourceFlags) {
+            super(source, sourceFlags);
+        }
+
+        @Override
+        final boolean opIsStateful() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        final Sink<E_IN> opWrapSink(int flags, Sink<Integer> sink) {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    abstract static class StatelessOp<E_IN> extends IntPipeline<E_IN> {
+        StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
+                    StreamShape inputShape,
+                    int opFlags) {
+            super(upstream, opFlags);
+            assert upstream.getOutputShape() == inputShape;
+        }
+
+        @Override
+        final boolean opIsStateful() {
+            return false;
+        }
+    }
+
+    abstract static class StatefulOp<E_IN> extends IntPipeline<E_IN> {
+        StatefulOp(AbstractPipeline<?, E_IN, ?> upstream,
+                   StreamShape inputShape,
+                   int opFlags) {
+            super(upstream, opFlags);
+            assert upstream.getOutputShape() == inputShape;
+        }
+
+        @Override
+        final boolean opIsStateful() {
+            return true;
+        }
+
+        @Override
+        abstract <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper,
+                                                         Spliterator<P_IN> spliterator,
+                                                         IntFunction<Integer[]> generator);
+    }
 }
--- a/src/share/classes/java/util/stream/IntermediateOp.java	Wed Apr 03 11:29:23 2013 -0700
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,197 +0,0 @@
-/*
- * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.  Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-package java.util.stream;
-
-import java.util.Spliterator;
-import java.util.function.IntFunction;
-
-/**
- * An operation in a stream pipeline that takes a stream as input and produces
- * a stream, possibly of a different type, as output.  An intermediate operation
- * has an input type and an output type, reflected in its type parameters
- * {@code E_IN} and {@code E_OUT}, and, an associated input shape and
- * output shape.  An intermediate operation also has a set of <em>operation
- * flags</em> that describes how it transforms characteristics of the stream
- * (such as sortedness or size; see {@link StreamOpFlag}).
- *
- * <p>Intermediate operations are implemented in terms of <em>sink transforms
- * </em>; given a {@code Sink} for the output type of the operation, produce a
- * {@code Sink} for the input type of the operation, which, when fed with
- * values, has the effect of implementing the desired operation on the input
- * values and feeding them to the output sink.
- *
- * <p>Some intermediate operations are <em>stateful</em>.  This means that the
- * sinks they produce as a result of the above wrapping may maintain state from
- * processing earlier elements.  Stateful intermediate operations must implement
- * the {@link StatefulOp} interface.  Statefulness has an effect on how the
- * operation can be parallelized.  Stateless operations parallelize trivially
- * because they are homomorphisms under concatenation:
- *
- * <pre>
- *     statelessOp(a || b) = statelessOp(a) || statelessOp(b)
- * </pre>
- *
- * where {@code ||} denotes concatenation.  Stateful operations may still be
- * parallelizable, but are not amenable to the automatic parallelization of
- * stateless operations.  Accordingly, a stateful operation must provide its own
- * parallel execution implementation
- * ({@link IntermediateOp#evaluateParallel(PipelineHelper, java.util.Spliterator, java.util.function.IntFunction)}).
- *
- * @apiNote
- * As an example, consider the stream pipeline:
- * <pre>
- *     int oldestBob = people.stream()
- *                            .filter(p -> p.getFirstName.equals("Bob"))
- *                            .mapToInt(p -> p.getAge())
- *                            .max();
- * </pre>
- *
- * <p>This pipeline has two intermediate operations, filter and map.  The
- * filtering operation has input and output types of {@code Person} (with input
- * and output shape of {@code REFERENCE}), and the mapping operation has an
- * input type of {@code Person} and an output type of {@code Integer} (with
- * shape {@code INT_VALUE}.)  When we construct a sink chain, the mapping
- * operation will be asked to transform a {@code Sink.OfInt} which computes the
- * maximum value into a {@code Sink} which accepts {@code Person} objects, and
- * whose behavior is to take the supplied {@code Person}, call {@code getAge()}
- * on it, and pass the resulting value to the downstream sink.  This sink
- * transform might be implement as:
- *
- * <pre>
- *     new Sink.ChainedReference<U>(sink) {
- *         public void accept(U u) {
- *             downstream.accept(mappingFunction.applyAsInt(u));
- *         }
- *     }
- * </pre>
- *
- * @param <E_IN>  Type of input elements to the operation
- * @param <E_OUT> Type of output elements to the operation
- * @see TerminalOp
- * @see StatefulOp
- * @since 1.8
- */
-interface IntermediateOp<E_IN, E_OUT> {
-
-    /**
-     * Gets the shape of the input type of this operation
-     *
-     * @implSpec The default returns {@code StreamShape.REFERENCE}
-     * @return Shape of the input type of this operation
-     */
-    default StreamShape inputShape() { return StreamShape.REFERENCE; }
-
-    /**
-     * Gets the shape of the output type of this operation
-     *
-     * @implSpec The default returns {@code StreamShape.REFERENCE}
-     * @return Shape of the output type of this operation
-     */
-    default StreamShape outputShape() { return StreamShape.REFERENCE; }
-
-    /**
-     * Gets the operation flags of this operation.
-     *
-     * @implSpec The default returns {@code 0}
-     * @return a bitmap describing the operation flags of this operation
-     * @see StreamOpFlag
-     */
-    default int getOpFlags() { return 0; }
-
-    /**
-     * Returns whether this operation is stateful or not.  If it is stateful,
-     * then the method
-     * {@link #evaluateParallel(PipelineHelper, java.util.Spliterator, java.util.function.IntFunction)}
-     * must be overridden.
-     *
-     * @implSpec The default implementation returns {@code false}.
-     * @return {@code true} if this operation is stateful
-     */
-    default boolean isStateful() { return false; }
-
-    /**
-     * Accepts a {@code Sink} which will receive the results of this operation,
-     * and return a {@code Sink} which accepts elements of the input type of
-     * this operation and which performs the operation, passing the results to
-     * the provided {@code Sink}.
-     *
-     * <p>The implementation may use the {@code flags} parameter to optimize the
-     * sink wrapping.  For example, if the input is already {@code DISTINCT},
-     * the implementation for the {@code Stream#distinct()} method could just
-     * return the sink it was passed.
-     *
-     * @param flags The combined stream and operation flags up to, but not
-     *        including, this operation.
-     * @param sink elements will be sent to this sink after the processing.
-     * @return a sink which will accept elements and perform the operation upon
-     *         each element, passing the results (if any) to the provided
-     *         {@code Sink}.
-     */
-    Sink<E_IN> wrapSink(int flags, Sink<E_OUT> sink);
-
-    /**
-     * Performs a parallel evaluation of the operation using the specified
-     * {@code PipelineHelper} which describes the stream source and upstream
-     * intermediate operations.  Only called on stateful operations.  If
-     * {@link #isStateful()} returns true then implementations must override the
-     * default implementation.
-     *
-     * @implSpec The default implementation throws an
-     * {@link UnsupportedOperationException}
-     *
-     * @param helper the pipeline helper
-     * @param spliterator the source {@code Spliterator}
-     * @param generator the array generator
-     * @return a {@code Node} describing the result of the evaluation
-     */
-    default <P_IN> Node<E_OUT> evaluateParallel(PipelineHelper<P_IN, E_OUT> helper,
-                                                Spliterator<P_IN> spliterator,
-                                                IntFunction<E_OUT[]> generator) {
-        throw new UnsupportedOperationException("Parallel evaluation is not supported");
-    }
-
-    /**
-     * Returns a {@code Spliterator} describing a parallel evaluation of the operation using
-     * the specified {@code PipelineHelper} which describes the stream source and upstream
-     * intermediate operations.  Only called on stateful operations.  It is not necessary
-     * (though acceptable) to do a full computation of the result here; it is preferable, if
-     * possible, to describe the result via a lazily evaluated spliterator.
-     *
-     * @implSpec The default implementation behaves as if:
-     * <pre>{@code
-     *     return evaluateParallel(helper, i -> (E_OUT[]) new Object[i]).spliterator();
-     * }</pre>
-     * and is suitable for implementations that cannot do better than a full synchronous
-     * evaluation.
-     *
-     * @param helper the pipeline helper
-     * @param spliterator the source {@code Spliterator}
-     * @return a {@code Spliterator} describing the result of the evaluation
-     */
-    default <P_IN> Spliterator<E_OUT> evaluateParallelLazy(PipelineHelper<P_IN, E_OUT> helper,
-                                                           Spliterator<P_IN> spliterator) {
-        return evaluateParallel(helper, spliterator, i -> (E_OUT[]) new Object[i]).spliterator();
-    }
-}
--- a/src/share/classes/java/util/stream/LongPipeline.java	Wed Apr 03 11:29:23 2013 -0700
+++ b/src/share/classes/java/util/stream/LongPipeline.java	Wed Apr 03 21:51:55 2013 +0200
@@ -49,7 +49,7 @@
  * @param <E_IN> Type of elements in the upstream source.
  * @since 1.8
  */
-class LongPipeline<E_IN> extends AbstractPipeline<E_IN, Long, LongStream> implements LongStream {
+abstract class LongPipeline<E_IN> extends AbstractPipeline<E_IN, Long, LongStream> implements LongStream {
 
     /**
      * Constructor for the head of a stream pipeline.
@@ -57,7 +57,7 @@
      * @param source {@code Supplier<Spliterator>} describing the stream source
      * @param sourceFlags The source flags for the stream source, described in {@link StreamOpFlag}
      */
-    public LongPipeline(Supplier<? extends Spliterator<Long>> source, int sourceFlags) {
+    LongPipeline(Supplier<? extends Spliterator<Long>> source, int sourceFlags) {
         super(source, sourceFlags);
     }
 
@@ -67,7 +67,7 @@
      * @param source {@code Spliterator} describing the stream source
      * @param sourceFlags The source flags for the stream source, described in {@link StreamOpFlag}
      */
-    public LongPipeline(Spliterator<Long> source, int sourceFlags) {
+    LongPipeline(Spliterator<Long> source, int sourceFlags) {
         super(source, sourceFlags);
     }
 
@@ -75,44 +75,10 @@
      * Constructor for appending an intermediate operation onto an existing pipeline.
      *
      * @param upstream the upstream element source.
-     * @param op the operation performed upon elements.
+     * @param opFlags the operation flags
      */
-    public LongPipeline(AbstractPipeline<?, E_IN, ?> upstream, IntermediateOp<E_IN, Long> op) {
-        super(upstream, op);
-    }
-
-    // Methods from AbstractPipeline
-
-    @Override
-    protected StreamShape getOutputShape() {
-        return StreamShape.LONG_VALUE;
-    }
-
-    @Override
-    protected <P_IN> Node<Long> evaluateToNode(PipelineHelper<P_IN, Long> helper,
-                                               Spliterator<P_IN> spliterator,
-                                               boolean flattenTree,
-                                               IntFunction<Long[]> generator) {
-        return NodeUtils.longCollect(helper, spliterator, flattenTree);
-    }
-
-    @Override
-    protected <P_IN> Spliterator<Long> wrap(PipelineHelper<P_IN, Long> ph,
-                                            Supplier<Spliterator<P_IN>> supplier,
-                                            boolean isParallel) {
-        return new StreamSpliterators.LongWrappingSpliterator<>(ph, supplier, isParallel);
-    }
-
-    @Override
-    protected Spliterator.OfLong lazySpliterator(Supplier<? extends Spliterator<Long>> supplier) {
-        return new StreamSpliterators.DelegatingSpliterator.OfLong((Supplier<Spliterator.OfLong>) supplier);
-    }
-
-    @Override
-    protected void forEachWithCancel(Spliterator<Long> spliterator, Sink<Long> sink) {
-        Spliterator.OfLong spl = adapt(spliterator);
-        LongConsumer adaptedSink =  adapt(sink);
-        while (!sink.cancellationRequested() && spl.tryAdvance(adaptedSink)) { }
+    LongPipeline(AbstractPipeline<?, E_IN, ?> upstream, int opFlags) {
+        super(upstream, opFlags);
     }
 
     private static LongConsumer adapt(Sink<Long> sink) {
@@ -126,17 +92,7 @@
         }
     }
 
-    @Override
-    protected Node.Builder<Long> makeNodeBuilder(long exactSizeIfKnown, IntFunction<Long[]> generator) {
-        return Nodes.longMakeBuilder(exactSizeIfKnown);
-    }
-
-    @Override
-    public Spliterator.OfLong spliterator() {
-        return adapt(super.spliterator());
-    }
-
-    static Spliterator.OfLong adapt(Spliterator<Long> s) {
+    private static Spliterator.OfLong adapt(Spliterator<Long> s) {
         if (s instanceof Spliterator.OfLong) {
             return (Spliterator.OfLong) s;
         }
@@ -147,153 +103,244 @@
         }
     }
 
+
+    // Shape-specific methods
+
     @Override
-    public PrimitiveIterator.OfLong iterator() {
+    final StreamShape getOutputShape() {
+        return StreamShape.LONG_VALUE;
+    }
+
+    @Override
+    final <P_IN> Node<Long> evaluateToNode(PipelineHelper<Long> helper,
+                                           Spliterator<P_IN> spliterator,
+                                           boolean flattenTree,
+                                           IntFunction<Long[]> generator) {
+        return NodeUtils.longCollect(helper, spliterator, flattenTree);
+    }
+
+    @Override
+    final <P_IN> Spliterator<Long> wrap(PipelineHelper<Long> ph,
+                                        Supplier<Spliterator<P_IN>> supplier,
+                                        boolean isParallel) {
+        return new StreamSpliterators.LongWrappingSpliterator<>(ph, supplier, isParallel);
+    }
+
+    @Override
+    final Spliterator.OfLong lazySpliterator(Supplier<? extends Spliterator<Long>> supplier) {
+        return new StreamSpliterators.DelegatingSpliterator.OfLong((Supplier<Spliterator.OfLong>) supplier);
+    }
+
+    @Override
+    final void forEachWithCancel(Spliterator<Long> spliterator, Sink<Long> sink) {
+        Spliterator.OfLong spl = adapt(spliterator);
+        LongConsumer adaptedSink =  adapt(sink);
+        while (!sink.cancellationRequested() && spl.tryAdvance(adaptedSink)) { }
+    }
+
+    @Override
+    final Node.Builder<Long> makeNodeBuilder(long exactSizeIfKnown, IntFunction<Long[]> generator) {
+        return Nodes.longMakeBuilder(exactSizeIfKnown);
+    }
+
+
+    // LongStream
+
+    @Override
+    public final PrimitiveIterator.OfLong iterator() {
         return Spliterators.iteratorFromSpliterator(spliterator());
     }
 
+    @Override
+    public final Spliterator.OfLong spliterator() {
+        return adapt(super.spliterator());
+    }
+
     // Stateless intermediate ops from LongStream
 
     @Override
-    public DoubleStream doubles() {
-        return chainedToDouble(StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT, StreamShape.LONG_VALUE,
-                               (flags, sink) -> new Sink.ChainedLong(sink) {
-                                   @Override
-                                   public void accept(long t) {
-                                       downstream.accept((double) t);
-                                   }
-                               });
+    public final DoubleStream doubles() {
+        return new DoublePipeline.StatelessOp<Long>(this, StreamShape.LONG_VALUE,
+                                                    StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
+            @Override
+            Sink<Long> opWrapSink(int flags, Sink<Double> sink) {
+                return new Sink.ChainedLong(sink) {
+                    @Override
+                    public void accept(long t) {
+                        downstream.accept((double) t);
+                    }
+                };
+            }
+        };
     }
 
     @Override
-    public Stream<Long> boxed() {
+    public final Stream<Long> boxed() {
         return mapToObj(Long::valueOf);
     }
 
     @Override
-    public LongStream map(LongUnaryOperator mapper) {
+    public final LongStream map(LongUnaryOperator mapper) {
         Objects.requireNonNull(mapper);
-        return chainedToLong(StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT, StreamShape.LONG_VALUE,
-                             (flags, sink) -> new Sink.ChainedLong(sink) {
-                                 @Override
-                                 public void accept(long t) {
-                                     downstream.accept(mapper.applyAsLong(t));
-                                 }
-                             });
+        return new StatelessOp<Long>(this, StreamShape.LONG_VALUE,
+                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
+            @Override
+            Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
+                return new Sink.ChainedLong(sink) {
+                    @Override
+                    public void accept(long t) {
+                        downstream.accept(mapper.applyAsLong(t));
+                    }
+                };
+            }
+        };
     }
 
     @Override
-    public <U> Stream<U> mapToObj(LongFunction<U> mapper) {
+    public final <U> Stream<U> mapToObj(LongFunction<U> mapper) {
         Objects.requireNonNull(mapper);
-        return chainedToRef(StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT, StreamShape.LONG_VALUE,
-                            (flags, sink) -> new Sink.ChainedLong(sink) {
-                                @Override
-                                public void accept(long t) {
-                                    downstream.accept(mapper.apply(t));
-                                }
-                            });
+        return new ReferencePipeline.StatelessOp<Long, U>(this, StreamShape.LONG_VALUE,
+                                                          StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
+            @Override
+            Sink<Long> opWrapSink(int flags, Sink<U> sink) {
+                return new Sink.ChainedLong(sink) {
+                    @Override
+                    public void accept(long t) {
+                        downstream.accept(mapper.apply(t));
+                    }
+                };
+            }
+        };
     }
 
     @Override
-    public IntStream mapToInt(LongToIntFunction mapper) {
+    public final IntStream mapToInt(LongToIntFunction mapper) {
         Objects.requireNonNull(mapper);
-        return chainedToInt(StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT, StreamShape.LONG_VALUE,
-                            (flags, sink) -> new Sink.ChainedLong(sink) {
-                                @Override
-                                public void accept(long t) {
-                                    downstream.accept(mapper.applyAsInt(t));
-                                }
-                            });
+        return new IntPipeline.StatelessOp<Long>(this, StreamShape.LONG_VALUE,
+                                                 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
+            @Override
+            Sink<Long> opWrapSink(int flags, Sink<Integer> sink) {
+                return new Sink.ChainedLong(sink) {
+                    @Override
+                    public void accept(long t) {
+                        downstream.accept(mapper.applyAsInt(t));
+                    }
+                };
+            }
+        };
     }
 
     @Override
-    public DoubleStream mapToDouble(LongToDoubleFunction mapper) {
+    public final DoubleStream mapToDouble(LongToDoubleFunction mapper) {
         Objects.requireNonNull(mapper);
-        return chainedToDouble(StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT, StreamShape.LONG_VALUE,
-                               (flags, sink) -> new Sink.ChainedLong(sink) {
-                                   @Override
-                                   public void accept(long t) {
-                                       downstream.accept(mapper.applyAsDouble(t));
-                                   }
-                               });
+        return new DoublePipeline.StatelessOp<Long>(this, StreamShape.LONG_VALUE,
+                                                    StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
+            @Override
+            Sink<Long> opWrapSink(int flags, Sink<Double> sink) {
+                return new Sink.ChainedLong(sink) {
+                    @Override
+                    public void accept(long t) {
+                        downstream.accept(mapper.applyAsDouble(t));
+                    }
+                };
+            }
+        };
     }
 
     @Override
-    public LongStream flatMap(LongFunction<? extends LongStream> mapper) {
+    public final LongStream flatMap(LongFunction<? extends LongStream> mapper) {
         return flatMap((long i, LongConsumer sink) -> mapper.apply(i).sequential().forEach(sink));
     }
 
     @Override
-    public LongStream flatMap(FlatMapper.OfLongToLong mapper) {
+    public final LongStream flatMap(FlatMapper.OfLongToLong mapper) {
         Objects.requireNonNull(mapper);
-        return chainedToLong(StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED,
-                             StreamShape.LONG_VALUE,
-                             (flags, sink) -> new Sink.ChainedLong(sink) {
-                                 public void accept(long t) {
-                                     mapper.flattenInto(t, (Sink.OfLong) downstream);
-                                 }
-                             });
+        return new StatelessOp<Long>(this, StreamShape.LONG_VALUE,
+                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
+            @Override
+            Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
+                return new Sink.ChainedLong(sink) {
+                    public void accept(long t) {
+                        mapper.flattenInto(t, (Sink.OfLong) downstream);
+                    }
+                };
+            }
+        };
     }
 
     @Override
-    public LongStream filter(LongPredicate predicate) {
+    public final LongStream filter(LongPredicate predicate) {
         Objects.requireNonNull(predicate);
-        return chainedToLong(StreamOpFlag.NOT_SIZED, StreamShape.LONG_VALUE,
-                             (flags, sink) -> new Sink.ChainedLong(sink) {
-                                 @Override
-                                 public void accept(long t) {
-                                     if (predicate.test(t))
-                                         downstream.accept(t);
-                                 }
-                             });
+        return new StatelessOp<Long>(this, StreamShape.LONG_VALUE,
+                                     StreamOpFlag.NOT_SIZED) {
+            @Override
+            Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
+                return new Sink.ChainedLong(sink) {
+                    @Override
+                    public void accept(long t) {
+                        if (predicate.test(t))
+                            downstream.accept(t);
+                    }
+                };
+            }
+        };
     }
 
     @Override
-    public LongStream peek(LongConsumer consumer) {
+    public final LongStream peek(LongConsumer consumer) {
         Objects.requireNonNull(consumer);
-        return chainedToLong(0, StreamShape.LONG_VALUE,
-                             (flags, sink) -> new Sink.ChainedLong(sink) {
-                                 @Override
-                                 public void accept(long t) {
-                                     consumer.accept(t);
-                                     downstream.accept(t);
-                                 }
-                             });
+        return new StatelessOp<Long>(this, StreamShape.LONG_VALUE,
+                                     0) {
+            @Override
+            Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
+                return new Sink.ChainedLong(sink) {
+                    @Override
+                    public void accept(long t) {
+                        consumer.accept(t);
+                        downstream.accept(t);
+                    }
+                };
+            }
+        };
     }
 
     // Stateful intermediate ops from LongStream
 
-    @Override
-    public LongStream limit(long maxSize) {
-        if (maxSize < 0)
-            throw new IllegalArgumentException(Long.toString(maxSize));
-        return super.slice(0, maxSize);
+    private LongStream slice(long skip, long limit) {
+        return SliceOps.makeLong(this, skip, limit);
     }
 
     @Override
-    public LongStream substream(long startingOffset) {
+    public final LongStream limit(long maxSize) {
+        if (maxSize < 0)
+            throw new IllegalArgumentException(Long.toString(maxSize));
+        return slice(0, maxSize);
+    }
+
+    @Override
+    public final LongStream substream(long startingOffset) {
         if (startingOffset < 0)
             throw new IllegalArgumentException(Long.toString(startingOffset));
         if (startingOffset == 0)
             return this;
         else
-            return super.slice(startingOffset, -1);
+            return slice(startingOffset, -1);
     }
 
     @Override
-    public LongStream substream(long startingOffset, long endingOffset) {
+    public final LongStream substream(long startingOffset, long endingOffset) {
         if (startingOffset < 0 || endingOffset < startingOffset)
             throw new IllegalArgumentException(String.format("substream(%d, %d)", startingOffset, endingOffset));
-        return super.slice(startingOffset, endingOffset - startingOffset);
+        return slice(startingOffset, endingOffset - startingOffset);
     }
 
     @Override
-    public LongStream sorted() {
-        return pipeline(new SortedOp.OfLong());
+    public final LongStream sorted() {
+        return SortedOps.makeLong(this);
     }
 
     @Override
-    public LongStream distinct() {
+    public final LongStream distinct() {
         // @@@ While functional and quick to implement this approach is not very efficient.
         //     An efficient version requires an long-specific map/set implementation.
         return boxed().distinct().mapToLong(i -> (long) i);
@@ -302,33 +349,33 @@
     // Terminal ops from LongStream
 
     @Override
-    public void forEach(LongConsumer consumer) {
+    public final void forEach(LongConsumer consumer) {
         evaluate(ForEachOps.makeLong(consumer, false));
     }
 
     @Override
-    public void forEachOrdered(LongConsumer consumer) {
+    public final void forEachOrdered(LongConsumer consumer) {
         evaluate(ForEachOps.makeLong(consumer, true));
     }
 
     @Override
-    public long sum() {
+    public final long sum() {
         // use better algorithm to compensate for intermediate overflow?
         return reduce(0, Long::sum);
     }
 
     @Override
-    public OptionalLong min() {
+    public final OptionalLong min() {
         return reduce(Math::min);
     }
 
     @Override
-    public OptionalLong max() {
+    public final OptionalLong max() {
         return reduce(Math::max);
     }
 
     @Override
-    public OptionalDouble average() {
+    public final OptionalDouble average() {
         long[] avg = collect(() -> new long[2],
                              (ll, i) -> {
                                  ll[0]++;
@@ -342,27 +389,27 @@
     }
 
     @Override
-    public long count() {
+    public final long count() {
         return map(e -> 1L).sum();
     }
 
     @Override
-    public LongSummaryStatistics summaryStatistics() {
+    public final LongSummaryStatistics summaryStatistics() {
         return collect(LongSummaryStatistics::new, LongSummaryStatistics::accept, LongSummaryStatistics::combine);
     }
 
     @Override
-    public long reduce(long identity, LongBinaryOperator op) {
+    public final long reduce(long identity, LongBinaryOperator op) {
         return evaluate(ReduceOps.makeLong(identity, op));
     }
 
     @Override
-    public OptionalLong reduce(LongBinaryOperator op) {
+    public final OptionalLong reduce(LongBinaryOperator op) {
         return evaluate(ReduceOps.makeLong(op));
     }
 
     @Override
-    public <R> R collect(Supplier<R> resultFactory, ObjLongConsumer<R> accumulator, BiConsumer<R, R> combiner) {
+    public final <R> R collect(Supplier<R> resultFactory, ObjLongConsumer<R> accumulator, BiConsumer<R, R> combiner) {
         BinaryOperator<R> operator = (left, right) -> {
             combiner.accept(left, right);
             return left;
@@ -371,32 +418,88 @@
     }
 
     @Override
-    public boolean anyMatch(LongPredicate predicate) {
+    public final boolean anyMatch(LongPredicate predicate) {
         return evaluate(MatchOps.makeLong(predicate, MatchOps.MatchKind.ANY));
     }
 
     @Override
-    public boolean allMatch(LongPredicate predicate) {
+    public final boolean allMatch(LongPredicate predicate) {
         return evaluate(MatchOps.makeLong(predicate, MatchOps.MatchKind.ALL));
     }
 
     @Override
-    public boolean noneMatch(LongPredicate predicate) {
+    public final boolean noneMatch(LongPredicate predicate) {
         return evaluate(MatchOps.makeLong(predicate, MatchOps.MatchKind.NONE));
     }
 
     @Override
-    public OptionalLong findFirst() {
+    public final OptionalLong findFirst() {
         return evaluate(FindOps.makeLong(true));
     }
 
     @Override
-    public OptionalLong findAny() {
+    public final OptionalLong findAny() {
         return evaluate(FindOps.makeLong(false));
     }
 
     @Override
-    public long[] toArray() {
+    public final long[] toArray() {
         return ((Node.OfLong) evaluateToArrayNode(Long[]::new)).asLongArray();
     }
+
+
+    //
+
+    static class Head<E_IN> extends LongPipeline<E_IN> {
+        Head(Supplier<? extends Spliterator<Long>> source, int sourceFlags) {
+            super(source, sourceFlags);
+        }
+
+        Head(Spliterator<Long> source, int sourceFlags) {
+            super(source, sourceFlags);
+        }
+
+        @Override
+        final boolean opIsStateful() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        final Sink<E_IN> opWrapSink(int flags, Sink<Long> sink) {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    abstract static class StatelessOp<E_IN> extends LongPipeline<E_IN> {
+        StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
+                    StreamShape inputShape,
+                    int opFlags) {
+            super(upstream, opFlags);
+            assert upstream.getOutputShape() == inputShape;
+        }
+
+        @Override
+        final boolean opIsStateful() {
+            return false;
+        }
+    }
+
+    abstract static class StatefulOp<E_IN> extends LongPipeline<E_IN> {
+        StatefulOp(AbstractPipeline<?, E_IN, ?> upstream,
+                   StreamShape inputShape,
+                   int opFlags) {
+            super(upstream, opFlags);
+            assert upstream.getOutputShape() == inputShape;
+        }
+
+        @Override
+        final boolean opIsStateful() {
+            return true;
+        }
+
+        @Override
+        abstract <P_IN> Node<Long> opEvaluateParallel(PipelineHelper<Long> helper,
+                                                      Spliterator<P_IN> spliterator,
+                                                      IntFunction<Long[]> generator);
+    }
 }
--- a/src/share/classes/java/util/stream/MatchOps.java	Wed Apr 03 11:29:23 2013 -0700
+++ b/src/share/classes/java/util/stream/MatchOps.java	Wed Apr 03 21:51:55 2013 +0200
@@ -241,12 +241,12 @@
         }
 
         @Override
-        public <S> Boolean evaluateSequential(PipelineHelper<S, T> helper, Spliterator<S> spliterator) {
+        public <S> Boolean evaluateSequential(PipelineHelper<T> helper, Spliterator<S> spliterator) {
             return helper.wrapAndCopyInto(sinkSupplier.get(), spliterator).getAndClearState();
         }
 
         @Override
-        public <S> Boolean evaluateParallel(PipelineHelper<S, T> helper, Spliterator<S> spliterator) {
+        public <S> Boolean evaluateParallel(PipelineHelper<T> helper, Spliterator<S> spliterator) {
             // Approach for parallel implementation:
             // - Decompose as per usual
             // - run match on leaf chunks, call result "b"
@@ -291,7 +291,7 @@
     private static final class MatchTask<S, T> extends AbstractShortCircuitTask<S, T, Boolean, MatchTask<S, T>> {
         private final MatchOp<T> op;
 
-        MatchTask(MatchOp<T> op, PipelineHelper<S, T> helper, Spliterator<S> spliterator) {
+        MatchTask(MatchOp<T> op, PipelineHelper<T> helper, Spliterator<S> spliterator) {
             super(helper, spliterator);
             this.op = op;
         }
--- a/src/share/classes/java/util/stream/NodeUtils.java	Wed Apr 03 11:29:23 2013 -0700
+++ b/src/share/classes/java/util/stream/NodeUtils.java	Wed Apr 03 21:51:55 2013 +0200
@@ -63,7 +63,7 @@
      * @param generator the array generator
      * @return the {@link Node} encapsulating the output elements.
      */
-    public static <P_IN, P_OUT> Node<P_OUT> collect(PipelineHelper<P_IN, P_OUT> helper,
+    public static <P_IN, P_OUT> Node<P_OUT> collect(PipelineHelper<P_OUT> helper,
                                                     Spliterator<P_IN> spliterator,
                                                     boolean flattenTree,
                                                     IntFunction<P_OUT[]> generator) {
@@ -129,7 +129,7 @@
      * @param <P_IN> type of input elements to the pipeline
      * @return the {@link Node.OfInt} encapsulating the output elements.
      */
-    public static <P_IN> Node.OfInt intCollect(PipelineHelper<P_IN, Integer> helper,
+    public static <P_IN> Node.OfInt intCollect(PipelineHelper<Integer> helper,
                                                Spliterator<P_IN> spliterator,
                                                boolean flattenTree) {
         long size = helper.exactOutputSizeIfKnown(spliterator);
@@ -193,7 +193,7 @@
      * @param <P_IN> type of input elements to the pipeline
      * @return the {@link Node.OfLong} encapsulating the output elements.
      */
-    public static <P_IN> Node.OfLong longCollect(PipelineHelper<P_IN, Long> helper,
+    public static <P_IN> Node.OfLong longCollect(PipelineHelper<Long> helper,
                                                  Spliterator<P_IN> spliterator,
                                                  boolean flattenTree) {
         long size = helper.exactOutputSizeIfKnown(spliterator);
@@ -257,7 +257,7 @@
      * @param <P_IN> type of input elements to the pipeline
      * @return the {@link Node.OfDouble} encapsulating the output elements.
      */
-    public static <P_IN> Node.OfDouble doubleCollect(PipelineHelper<P_IN, Double> helper,
+    public static <P_IN> Node.OfDouble doubleCollect(PipelineHelper<Double> helper,
                                                      Spliterator<P_IN> spliterator,
                                                      boolean flattenTree) {
         long size = helper.exactOutputSizeIfKnown(spliterator);
@@ -301,10 +301,10 @@
     // Reference implementations
 
     private static final class CollectorTask<T, U> extends AbstractTask<T, U, Node<U>, CollectorTask<T, U>> {
-        private final PipelineHelper<T, U> helper;
+        private final PipelineHelper<U> helper;
         private final IntFunction<U[]> generator;
 
-        CollectorTask(PipelineHelper<T, U> helper, IntFunction<U[]> generator, Spliterator<T> spliterator) {
+        CollectorTask(PipelineHelper<U> helper, IntFunction<U[]> generator, Spliterator<T> spliterator) {
             super(helper, spliterator, AbstractTask.suggestTargetSize(spliterator.estimateSize()));
             this.helper = helper;
             this.generator = generator;
@@ -347,13 +347,13 @@
 
     private static final class SizedCollectorTask<T, U> extends CountedCompleter<Void> {
         private final Spliterator<T> spliterator;
-        private final PipelineHelper<T, U> helper;
+        private final PipelineHelper<U> helper;
         private final U[] array;
         private final long targetSize;
         private long offset;
         private long length;
 
-        SizedCollectorTask(Spliterator<T> spliterator, PipelineHelper<T, U> helper, U[] array) {
+        SizedCollectorTask(Spliterator<T> spliterator, PipelineHelper<U> helper, U[] array) {
             assert spliterator.hasCharacteristics(Spliterator.SUBSIZED);
             this.spliterator = spliterator;
             this.helper = helper;
@@ -491,9 +491,9 @@
     // of push leaf data to an array or copying node data to an array at an offset.
 
     private static final class IntCollectorTask<T> extends AbstractTask<T, Integer, Node.OfInt, IntCollectorTask<T>> {
-        private final PipelineHelper<T, Integer> helper;
+        private final PipelineHelper<Integer> helper;
 
-        IntCollectorTask(PipelineHelper<T, Integer> helper, Spliterator<T> spliterator) {
+        IntCollectorTask(PipelineHelper<Integer> helper, Spliterator<T> spliterator) {
             super(helper, spliterator, AbstractTask.suggestTargetSize(spliterator.estimateSize()));
             this.helper = helper;
         }
@@ -533,13 +533,13 @@
 
     private static final class IntSizedCollectorTask<T> extends CountedCompleter<Void> {
         private final Spliterator<T> spliterator;
-        private final PipelineHelper<T, Integer> helper;
+        private final PipelineHelper<Integer> helper;
         private final int[] array;
         private final long targetSize;
         private long offset;
         private long length;
 
-        IntSizedCollectorTask(Spliterator<T> spliterator, PipelineHelper<T, Integer> helper, int[] array) {
+        IntSizedCollectorTask(Spliterator<T> spliterator, PipelineHelper<Integer> helper, int[] array) {
             assert spliterator.hasCharacteristics(Spliterator.SUBSIZED);
             this.spliterator = spliterator;
             this.helper = helper;
@@ -674,9 +674,9 @@
     // Long value implementations
 
     private static final class LongCollectorTask<T> extends AbstractTask<T, Long, Node.OfLong, LongCollectorTask<T>> {
-        private final PipelineHelper<T, Long> helper;
+        private final PipelineHelper<Long> helper;
 
-        LongCollectorTask(PipelineHelper<T, Long> helper, Spliterator<T> spliterator) {
+        LongCollectorTask(PipelineHelper<Long> helper, Spliterator<T> spliterator) {
             super(helper, spliterator, AbstractTask.suggestTargetSize(spliterator.estimateSize()));
             this.helper = helper;
         }
@@ -716,13 +716,13 @@
 
     private static final class LongSizedCollectorTask<T> extends CountedCompleter<Void> {
         private final Spliterator<T> spliterator;
-        private final PipelineHelper<T, Long> helper;
+        private final PipelineHelper<Long> helper;
         private final long[] array;
         private final long targetSize;
         private long offset;
         private long length;
 
-        LongSizedCollectorTask(Spliterator<T> spliterator, PipelineHelper<T, Long> helper, long[] array) {
+        LongSizedCollectorTask(Spliterator<T> spliterator, PipelineHelper<Long> helper, long[] array) {
             assert spliterator.hasCharacteristics(Spliterator.SUBSIZED);
             this.spliterator = spliterator;
             this.helper = helper;
@@ -856,9 +856,9 @@
     // Double value implementations
 
     private static final class DoubleCollectorTask<T> extends AbstractTask<T, Double, Node.OfDouble, DoubleCollectorTask<T>> {
-        private final PipelineHelper<T, Double> helper;
+        private final PipelineHelper<Double> helper;
 
-        DoubleCollectorTask(PipelineHelper<T, Double> helper, Spliterator<T> spliterator) {
+        DoubleCollectorTask(PipelineHelper<Double> helper, Spliterator<T> spliterator) {
             super(helper, spliterator, AbstractTask.suggestTargetSize(spliterator.estimateSize()));
             this.helper = helper;
         }
@@ -898,13 +898,13 @@
 
     private static final class DoubleSizedCollectorTask<T> extends CountedCompleter<Void> {
         private final Spliterator<T> spliterator;
-        private final PipelineHelper<T, Double> helper;
+        private final PipelineHelper<Double> helper;
         private final double[] array;
         private final long targetSize;
         private long offset;
         private long length;
 
-        DoubleSizedCollectorTask(Spliterator<T> spliterator, PipelineHelper<T, Double> helper, double[] array) {
+        DoubleSizedCollectorTask(Spliterator<T> spliterator, PipelineHelper<Double> helper, double[] array) {
             assert spliterator.hasCharacteristics(Spliterator.SUBSIZED);
             this.spliterator = spliterator;
             this.helper = helper;
--- a/src/share/classes/java/util/stream/PipelineHelper.java	Wed Apr 03 11:29:23 2013 -0700
+++ b/src/share/classes/java/util/stream/PipelineHelper.java	Wed Apr 03 21:51:55 2013 +0200
@@ -43,18 +43,17 @@
  * by this {@code PipelineHelper}.  The {@code PipelineHelper} is passed to the
  * {@link TerminalOp#evaluateParallel(PipelineHelper, java.util.Spliterator)},
  * {@link TerminalOp#evaluateSequential(PipelineHelper, java.util.Spliterator)}, and
- * {@link IntermediateOp#evaluateParallel(PipelineHelper, java.util.Spliterator, java.util.function.IntFunction)},
+ * {@link AbstractPipeline#opEvaluateParallel(PipelineHelper, java.util.Spliterator, java.util.function.IntFunction)},
  * methods, which can use the {@code PipelineHelper} to access the source
  * {@code Spliterator} for the pipeline, information about the pipeline such as
  * input shape, output shape, stream flags, and size, and use the helper methods
  * such as {@link #wrapAndCopyInto(Sink, Spliterator)}, {@link #copyInto(Sink, Spliterator)},
  * and {@link #wrapSink(Sink)} to execute pipeline operations.
  *
- * @param <P_IN>  Type of input elements to the pipeline
  * @param <P_OUT> Type of output elements from the pipeline
  * @since 1.8
  */
-interface PipelineHelper<P_IN, P_OUT> {
+abstract class PipelineHelper<P_OUT> {
 
     /**
      * Gets the combined stream and operation flags for the output of the
@@ -65,7 +64,7 @@
      *         pipeline
      * @see StreamOpFlag
      */
-    int getStreamAndOpFlags();
+    abstract int getStreamAndOpFlags();
 
     /**
      * Returns the exact output size of the portion of the output resulting from
@@ -84,7 +83,7 @@
      *        source data
      * @return the exact size if known, or -1 if infinite or unknown
      */
-    long exactOutputSizeIfKnown(Spliterator<P_IN> spliterator);
+    abstract<P_IN> long exactOutputSizeIfKnown(Spliterator<P_IN> spliterator);
 
     /**
      * Applies the pipeline stages described by this {@code PipelineHelper} to
@@ -101,7 +100,7 @@
      * @param spliterator the spliterator describing the portion of the source
      *        input to process
      */
-    <S extends Sink<P_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator);
+    abstract<P_IN, S extends Sink<P_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator);
 
     /**
      * Pushes elements obtained from the {@code Spliterator} into the provided
@@ -118,7 +117,23 @@
      * @param wrappedSink the destination {@code Sink}
      * @param spliterator the source {@code Spliterator}
      */
-    void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator);
+    abstract<P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator);
+
+    /**
+     * Pushes elements obtained from the {@code Spliterator} into the provided
+     * {@code Sink}, checking {@link Sink#cancellationRequested()} after each
+     * element, and stopping if cancellation is requested.
+     *
+     * @implSpec
+     * This method conforms to the {@code Sink} protocol of calling
+     * {@code Sink.begin} before pushing elements, via {@code Sink.accept}, and
+     * calling {@code Sink.end} after all elements have been pushed or if
+     * cancellation is requested.
+     *
+     * @param wrappedSink the destination {@code Sink}
+     * @param spliterator the source {@code Spliterator}
+     */
+    abstract <P_IN> void copyIntoWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator);
 
     /**
      * Takes a {@code Sink} that accepts elements of the output type of the
@@ -131,7 +146,7 @@
      * @return a {@code Sink} that implements the pipeline stages and sends
      *         results to the provided {@code Sink}
      */
-    Sink<P_IN> wrapSink(Sink<P_OUT> sink);
+    abstract<P_IN> Sink<P_IN> wrapSink(Sink<P_OUT> sink);
 
     /**
      * Constructs a @{link Node.Builder} compatible with the output shape of
@@ -145,8 +160,8 @@
      * @return A {@code Node.Builder} compatible with the output shape of this
      *         {@code PipelineHelper}
      */
-    Node.Builder<P_OUT> makeNodeBuilder(long exactSizeIfKnown,
-                                        IntFunction<P_OUT[]> generator);
+    abstract Node.Builder<P_OUT> makeNodeBuilder(long exactSizeIfKnown,
+                                                 IntFunction<P_OUT[]> generator);
 
     /**
      * Collects all output elements resulting from applying the pipeline stages
@@ -170,39 +185,7 @@
      * @param generator the array generator
      * @return the {@code Node} containing all output elements
      */
-    Node<P_OUT> evaluate(Spliterator<P_IN> spliterator,
-                         boolean flatten,
-                         IntFunction<P_OUT[]> generator);
-
-    /**
-     * Collects all output elements resulting from the applying the pipeline
-     * stages, plus an additional final stage that is an intermediate operation,
-     * to the source {@code Spliterator} into a {code Node}.  The order of
-     * output elements will respect the encounter order of the source stream,
-     * and all computation will happen in the invoking thread.
-     * <p>
-     * Implementations of
-     * {@link IntermediateOp#evaluateParallel(PipelineHelper, java.util.Spliterator, java.util.function.IntFunction)}
-     * can defer to this method if a sequential implementation is acceptable.
-     *
-     * @implSpec
-     * If the intermediate operation injects {@link StreamOpFlag#SHORT_CIRCUIT}
-     * then this implementation must stop collecting output elements when the
-     * sink returned from {@link IntermediateOp#wrapSink(int, Sink)} reports it
-     * is cancelled.
-     * <p>
-     * If the intermediate operation preserves or injects
-     * {@link StreamOpFlag#SIZED} and the output size of the pipeline is known
-     * then this implementation may apply size optimizations since the output
-     * size is known.
-     *
-     * @param op An {@code IntermediateOp} representing the final stage in the
-     *        pipeline, typically a {@code StatefulOp}
-     * @param spliterator the source {@code Spliterator}
-     * @param generator the array generator
-     * @return A {@code Node} containing the output of the stream pipeline
-     */
-    Node<P_OUT> evaluateSequential(IntermediateOp<P_OUT, P_OUT> op,
-                                   Spliterator<P_IN> spliterator,
-                                   IntFunction<P_OUT[]> generator);
+    abstract<P_IN> Node<P_OUT> evaluate(Spliterator<P_IN> spliterator,
+                                        boolean flatten,
+                                        IntFunction<P_OUT[]> generator);
 }
--- a/src/share/classes/java/util/stream/ReduceOps.java	Wed Apr 03 11:29:23 2013 -0700
+++ b/src/share/classes/java/util/stream/ReduceOps.java	Wed Apr 03 21:51:55 2013 +0200
@@ -673,12 +673,12 @@
         }
 
         @Override
-        public <P_IN> R evaluateSequential(PipelineHelper<P_IN, T> helper, Spliterator<P_IN> spliterator) {
+        public <P_IN> R evaluateSequential(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {
             return helper.wrapAndCopyInto(makeSink(), spliterator).get();
         }
 
         @Override
-        public <P_IN> R evaluateParallel(PipelineHelper<P_IN, T> helper, Spliterator<P_IN> spliterator) {
+        public <P_IN> R evaluateParallel(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {
             return new ReduceTask<>(this, helper, spliterator).invoke().get();
         }
     }
@@ -688,7 +688,7 @@
             extends AbstractTask<P_IN, P_OUT, S, ReduceTask<P_IN, P_OUT, R, S>> {
         private final ReduceOp<P_OUT, R, S> op;
 
-        ReduceTask(ReduceOp<P_OUT, R, S> op, PipelineHelper<P_IN, P_OUT> helper, Spliterator<P_IN> spliterator) {
+        ReduceTask(ReduceOp<P_OUT, R, S> op, PipelineHelper<P_OUT> helper, Spliterator<P_IN> spliterator) {
             super(helper, spliterator);
             this.op = op;
         }
--- a/src/share/classes/java/util/stream/ReferencePipeline.java	Wed Apr 03 11:29:23 2013 -0700
+++ b/src/share/classes/java/util/stream/ReferencePipeline.java	Wed Apr 03 21:51:55 2013 +0200
@@ -51,16 +51,17 @@
  *
  * @since 1.8
  */
-class ReferencePipeline<T, U> extends AbstractPipeline<T, U, Stream<U>> implements Stream<U>  {
+abstract class ReferencePipeline<T, U> extends AbstractPipeline<T, U, Stream<U>> implements Stream<U>  {
 
     /**
      * Constructor for the head of a stream pipeline.
      *
      * @param source {@code Supplier<Spliterator>} describing the stream source
-     * @param sourceFlags The source flags for the stream source, described in {@link StreamOpFlag}
+     * @param sourceFlags The source flags for the stream source, described in
+     * {@link StreamOpFlag}
      */
-    public ReferencePipeline(Supplier<? extends Spliterator<?>> source,
-                             int sourceFlags) {
+    ReferencePipeline(Supplier<? extends Spliterator<?>> source,
+                      int sourceFlags) {
         super(source, sourceFlags);
     }
 
@@ -68,252 +69,309 @@
      * Constructor for the head of a stream pipeline.
      *
      * @param source {@code Spliterator} describing the stream source
-     * @param sourceFlags The source flags for the stream source, described in {@link StreamOpFlag}
+     * @param sourceFlags The source flags for the stream source, described in
+     * {@link StreamOpFlag}
      */
-    public ReferencePipeline(Spliterator<?> source, int sourceFlags) {
+    ReferencePipeline(Spliterator<?> source, int sourceFlags) {
         super(source, sourceFlags);
     }
 
     /**
-     * Constructor for appending an intermediate operation onto an existing pipeline.
+     * Constructor for appending an intermediate operation onto an existing
+     * pipeline.
      *
      * @param upstream the upstream element source.
-     * @param op the operation performed upon elements.
      */
-    public ReferencePipeline(AbstractPipeline<?, T, ?> upstream, IntermediateOp<T, U> op) {
-        super(upstream, op);
+    ReferencePipeline(AbstractPipeline<?, T, ?> upstream, int opFlags) {
+        super(upstream, opFlags);
     }
 
-    // Methods from AbstractPipeline
+
+    // Shape-specific methods
 
     @Override
-    protected StreamShape getOutputShape() {
+    final StreamShape getOutputShape() {
         return StreamShape.REFERENCE;
     }
 
     @Override
-    protected <P_IN> Node<U> evaluateToNode(PipelineHelper<P_IN, U> helper,
-                                            Spliterator<P_IN> spliterator,
-                                            boolean flattenTree,
-                                            IntFunction<U[]> generator) {
+    final <P_IN> Node<U> evaluateToNode(PipelineHelper<U> helper,
+                                        Spliterator<P_IN> spliterator,
+                                        boolean flattenTree,
+                                        IntFunction<U[]> generator) {
         return NodeUtils.collect(helper, spliterator, flattenTree, generator);
     }
 
     @Override
-    protected <P_IN> Spliterator<U> wrap(PipelineHelper<P_IN, U> ph,
-                                         Supplier<Spliterator<P_IN>> supplier,
-                                         boolean isParallel) {
+    final <P_IN> Spliterator<U> wrap(PipelineHelper<U> ph,
+                                     Supplier<Spliterator<P_IN>> supplier,
+                                     boolean isParallel) {
         return new StreamSpliterators.WrappingSpliterator<>(ph, supplier, isParallel);
     }
 
     @Override
-    protected Spliterator<U> lazySpliterator(Supplier<? extends Spliterator<U>> supplier) {
+    final Spliterator<U> lazySpliterator(Supplier<? extends Spliterator<U>> supplier) {
         return new StreamSpliterators.DelegatingSpliterator<>(supplier);
     }
 
     @Override
-    protected void forEachWithCancel(Spliterator<U> spliterator, Sink<U> sink) {
+    final void forEachWithCancel(Spliterator<U> spliterator, Sink<U> sink) {
         while (!sink.cancellationRequested() && spliterator.tryAdvance(sink)) { }
     }
 
     @Override
-    protected Node.Builder<U> makeNodeBuilder(long exactSizeIfKnown, IntFunction<U[]> generator) {
+    final Node.Builder<U> makeNodeBuilder(long exactSizeIfKnown, IntFunction<U[]> generator) {
         return Nodes.makeBuilder(exactSizeIfKnown, generator);
     }
 
+
+    // BaseStream
+
     @Override
-    public Iterator<U> iterator() {
+    public final Iterator<U> iterator() {
         return Spliterators.iteratorFromSpliterator(spliterator());
     }
 
+
+    // Stream
+
     // Stateless intermediate operations from Stream
 
     @Override
-    public Stream<U> filter(Predicate<? super U> predicate) {
+    public final Stream<U> filter(Predicate<? super U> predicate) {
         Objects.requireNonNull(predicate);
-        return chainedToRef(StreamOpFlag.NOT_SIZED, StreamShape.REFERENCE,
-                            (flags, sink) -> new Sink.ChainedReference<U>(sink) {
-                                @Override
-                                public void accept(U u) {
-                                    if (predicate.test(u))
-                                        downstream.accept(u);
-                                }
-                            });
+        return new StatelessOp<U, U>(this, StreamShape.REFERENCE,
+                                     StreamOpFlag.NOT_SIZED) {
+            @Override
+            Sink<U> opWrapSink(int flags, Sink<U> sink) {
+                return new Sink.ChainedReference<U>(sink) {
+                    @Override
+                    public void accept(U u) {
+                        if (predicate.test(u))
+                            downstream.accept(u);
+                    }
+                };
+            }
+        };
     }
 
     @Override
-    public <R> Stream<R> map(Function<? super U, ? extends R> mapper) {
+    public final <R> Stream<R> map(Function<? super U, ? extends R> mapper) {
         Objects.requireNonNull(mapper);
-        return chainedToRef(StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT, StreamShape.REFERENCE,
-                            (flags, sink) -> new Sink.ChainedReference<U>(sink) {
-                                @Override
-                                public void accept(U u) {
-                                    downstream.accept(mapper.apply(u));
-                                }
-                            });
+        return new StatelessOp<U, R>(this, StreamShape.REFERENCE,
+                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
+            @Override
+            Sink<U> opWrapSink(int flags, Sink<R> sink) {
+                return new Sink.ChainedReference<U>(sink) {
+                    @Override
+                    public void accept(U u) {
+                        downstream.accept(mapper.apply(u));
+                    }
+                };
+            }
+        };
     }
 
     @Override
-    public IntStream mapToInt(ToIntFunction<? super U> mapper) {
+    public final IntStream mapToInt(ToIntFunction<? super U> mapper) {
         Objects.requireNonNull(mapper);
-        return chainedToInt(StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT, StreamShape.REFERENCE,
-                            (flags, sink) -> new Sink.ChainedReference<U>(sink) {
-                                @Override
-                                public void accept(U u) {
-                                    downstream.accept(mapper.applyAsInt(u));
-                                }
-                            });
+        return new IntPipeline.StatelessOp<U>(this, StreamShape.REFERENCE,
+                                              StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
+            @Override
+            Sink<U> opWrapSink(int flags, Sink<Integer> sink) {
+                return new Sink.ChainedReference<U>(sink) {
+                    @Override
+                    public void accept(U u) {
+                        downstream.accept(mapper.applyAsInt(u));
+                    }
+                };
+            }
+        };
     }
 
     @Override
-    public LongStream mapToLong(ToLongFunction<? super U> mapper) {
+    public final LongStream mapToLong(ToLongFunction<? super U> mapper) {
         Objects.requireNonNull(mapper);
-        return chainedToLong(StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT, StreamShape.REFERENCE,
-                             (flags, sink) -> new Sink.ChainedReference<U>(sink) {
-                                 @Override
-                                 public void accept(U u) {
-                                     downstream.accept(mapper.applyAsLong(u));
-                                 }
-                             });
+        return new LongPipeline.StatelessOp<U>(this, StreamShape.REFERENCE,
+                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
+            @Override
+            Sink<U> opWrapSink(int flags, Sink<Long> sink) {
+                return new Sink.ChainedReference<U>(sink) {
+                    @Override
+                    public void accept(U u) {
+                        downstream.accept(mapper.applyAsLong(u));
+                    }
+                };
+            }
+        };
     }
 
     @Override
-    public DoubleStream mapToDouble(ToDoubleFunction<? super U> mapper) {
+    public final DoubleStream mapToDouble(ToDoubleFunction<? super U> mapper) {
         Objects.requireNonNull(mapper);
-        return chainedToDouble(StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT, StreamShape.REFERENCE,
-                               (flags, sink) -> new Sink.ChainedReference<U>(sink) {
-                                   @Override
-                                   public void accept(U u) {
-                                       downstream.accept(mapper.applyAsDouble(u));
-                                   }
-                               });
+        return new DoublePipeline.StatelessOp<U>(this, StreamShape.REFERENCE,
+                                        StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
+            @Override
+            Sink<U> opWrapSink(int flags, Sink<Double> sink) {
+                return new Sink.ChainedReference<U>(sink) {
+                    @Override
+                    public void accept(U u) {
+                        downstream.accept(mapper.applyAsDouble(u));
+                    }
+                };
+            }
+        };
     }
 
     @Override
-    public <R> Stream<R> flatMap(Function<U, Stream<? extends R>> mapper) {
+    public final <R> Stream<R> flatMap(Function<U, Stream<? extends R>> mapper) {
         Objects.requireNonNull(mapper);
         // We can do better than this, by polling cancellationRequested when stream is infinite
         return flatMap((U u, Consumer<R> sink) -> mapper.apply(u).sequential().forEach(sink));
     }
 
     @Override
-    public <R> Stream<R> flatMap(FlatMapper<? super U, R> mapper) {
+    public final <R> Stream<R> flatMap(FlatMapper<? super U, R> mapper) {
         Objects.requireNonNull(mapper);
-        return chainedToRef(StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED,
-                            StreamShape.REFERENCE,
-                            (flags, sink) -> new Sink.ChainedReference<U>(sink) {
-                                public void accept(U u) {
-                                    mapper.flattenInto(u, downstream);
-                                }
-                            });
+        return new StatelessOp<U, R>(this, StreamShape.REFERENCE,
+                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
+            @Override
+            Sink<U> opWrapSink(int flags, Sink<R> sink) {
+                return new Sink.ChainedReference<U>(sink) {
+                    public void accept(U u) {
+                        mapper.flattenInto(u, downstream);
+                    }
+                };
+            }
+        };
     }
 
     @Override
-    public IntStream flatMapToInt(FlatMapper.ToInt<? super U> mapper) {
+    public final IntStream flatMapToInt(FlatMapper.ToInt<? super U> mapper) {
         Objects.requireNonNull(mapper);
-        return chainedToInt(StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED,
-                            StreamShape.REFERENCE,
-                            (flags, sink) -> new Sink.ChainedReference<U>(sink) {
-                                public void accept(U u) {
-                                    mapper.flattenInto(u, (Sink.OfInt) downstream);
-                                }
-                            });
+        return new IntPipeline.StatelessOp<U>(this, StreamShape.REFERENCE,
+                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
+            @Override
+            Sink<U> opWrapSink(int flags, Sink<Integer> sink) {
+                return new Sink.ChainedReference<U>(sink) {
+                    public void accept(U u) {
+                        mapper.flattenInto(u, (Sink.OfInt) downstream);
+                    }
+                };
+            }
+        };
     }
 
     @Override
-    public DoubleStream flatMapToDouble(FlatMapper.ToDouble<? super U> mapper) {
+    public final DoubleStream flatMapToDouble(FlatMapper.ToDouble<? super U> mapper) {
         Objects.requireNonNull(mapper);
-        return chainedToDouble(StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED,
-                               StreamShape.REFERENCE,
-                               (flags, sink) -> new Sink.ChainedReference<U>(sink) {
-                                   public void accept(U u) {
-                                       mapper.flattenInto(u, (Sink.OfDouble) downstream);
-                                   }
-                               });
+        return new DoublePipeline.StatelessOp<U>(this, StreamShape.REFERENCE,
+                                        StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
+            @Override
+            Sink<U> opWrapSink(int flags, Sink<Double> sink) {
+                return new Sink.ChainedReference<U>(sink) {
+                    public void accept(U u) {
+                        mapper.flattenInto(u, (Sink.OfDouble) downstream);
+                    }
+                };
+            }
+        };
     }
 
     @Override
-    public LongStream flatMapToLong(FlatMapper.ToLong<? super U> mapper) {
+    public final LongStream flatMapToLong(FlatMapper.ToLong<? super U> mapper) {
         Objects.requireNonNull(mapper);
-        return chainedToLong(StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED,
-                             StreamShape.REFERENCE,
-                             (flags, sink) -> new Sink.ChainedReference<U>(sink) {
-                                 public void accept(U u) {
-                                     mapper.flattenInto(u, (Sink.OfLong) downstream);
-                                 }
-                             });
+        return new LongPipeline.StatelessOp<U>(this, StreamShape.REFERENCE,
+                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
+            @Override
+            Sink<U> opWrapSink(int flags, Sink<Long> sink) {
+                return new Sink.ChainedReference<U>(sink) {
+                    public void accept(U u) {
+                        mapper.flattenInto(u, (Sink.OfLong) downstream);
+                    }
+                };
+            }
+        };
     }
 
     @Override
-    public Stream<U> peek(Consumer<? super U> tee) {
+    public final Stream<U> peek(Consumer<? super U> tee) {
         Objects.requireNonNull(tee);
-        return chainedToRef(0, StreamShape.REFERENCE,
-                            (flags, sink) -> new Sink.ChainedReference<U>(sink) {
-                                @Override
-                                public void accept(U u) {
-                                    tee.accept(u);
-                                    downstream.accept(u);
-                                }
-                            });
+        return new StatelessOp<U, U>(this, StreamShape.REFERENCE,
+                                     0) {
+            @Override
+            Sink<U> opWrapSink(int flags, Sink<U> sink) {
+                return new Sink.ChainedReference<U>(sink) {
+                    @Override
+                    public void accept(U u) {
+                        tee.accept(u);
+                        downstream.accept(u);
+                    }
+                };
+            }
+        };
     }
 
     // Stateful intermediate operations from Stream
 
     @Override
-    public Stream<U> distinct() {
-        return pipeline(new DistinctOp<U>());
+    public final Stream<U> distinct() {
+        return DistinctOps.makeRef(this);
     }
 
     @Override
-    public Stream<U> sorted() {
-        return pipeline(new SortedOp.OfRef<U>());
+    public final Stream<U> sorted() {
+        return SortedOps.makeRef(this);
     }
 
     @Override
-    public Stream<U> sorted(Comparator<? super U> comparator) {
-        return pipeline(new SortedOp.OfRef<>(comparator));
+    public final Stream<U> sorted(Comparator<? super U> comparator) {
+        return SortedOps.makeRef(this, comparator);
+    }
+
+    private final Stream<U> slice(long skip, long limit) {
+        return SliceOps.makeRef(this, skip, limit);
     }
 
     @Override
-    public Stream<U> limit(long maxSize) {
+    public final Stream<U> limit(long maxSize) {
         if (maxSize < 0)
             throw new IllegalArgumentException(Long.toString(maxSize));
-        return super.slice(0, maxSize);
+        return slice(0, maxSize);
     }
 
     @Override
-    public Stream<U> substream(long startingOffset) {
+    public final Stream<U> substream(long startingOffset) {
         if (startingOffset < 0)
             throw new IllegalArgumentException(Long.toString(startingOffset));
         if (startingOffset == 0)
             return this;
         else
-            return super.slice(startingOffset, -1);
+            return slice(startingOffset, -1);
     }
 
     @Override
-    public Stream<U> substream(long startingOffset, long endingOffset) {
+    public final Stream<U> substream(long startingOffset, long endingOffset) {
         if (startingOffset < 0 || endingOffset < startingOffset)
             throw new IllegalArgumentException(String.format("substream(%d, %d)", startingOffset, endingOffset));
-        return super.slice(startingOffset, endingOffset - startingOffset);
+        return slice(startingOffset, endingOffset - startingOffset);
     }
 
-
     // Terminal operations from Stream
 
     @Override
-    public void forEach(Consumer<? super U> consumer) {
+    public final void forEach(Consumer<? super U> consumer) {
         evaluate(ForEachOps.makeRef(consumer, false));
     }
 
     @Override
-    public void forEachOrdered(Consumer<? super U> consumer) {
+    public final void forEachOrdered(Consumer<? super U> consumer) {
         evaluate(ForEachOps.makeRef(consumer, true));
     }
 
     @Override
     @SuppressWarnings("unchecked")
-    public <A> A[] toArray(IntFunction<A[]> generator) {
+    public final <A> A[] toArray(IntFunction<A[]> generator) {
         // Since A has no relation to U (not possible to declare that A is an upper bound of U)
         // there will be no static type checking.
         // Therefore use a raw type and assume A == U rather than propagating the separation of A and U
@@ -326,62 +384,62 @@
     }
 
     @Override
-    public Object[] toArray() {
+    public final Object[] toArray() {
         return toArray(Object[]::new);
     }
 
     @Override
-    public boolean anyMatch(Predicate<? super U> predicate) {
+    public final boolean anyMatch(Predicate<? super U> predicate) {
         return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.ANY));
     }
 
     @Override
-    public boolean allMatch(Predicate<? super U> predicate) {
+    public final boolean allMatch(Predicate<? super U> predicate) {
         return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.ALL));
     }
 
     @Override
-    public boolean noneMatch(Predicate<? super U> predicate) {
+    public final boolean noneMatch(Predicate<? super U> predicate) {
         return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.NONE));
     }
 
     @Override
-    public Optional<U> findFirst() {
+    public final Optional<U> findFirst() {
         return evaluate(FindOps.makeRef(true));
     }
 
     @Override
-    public Optional<U> findAny() {
+    public final Optional<U> findAny() {
         return evaluate(FindOps.makeRef(false));
     }
 
     @Override
-    public U reduce(final U identity, final BinaryOperator<U> accumulator) {
+    public final U reduce(final U identity, final BinaryOperator<U> accumulator) {
         return evaluate(ReduceOps.makeRef(identity, accumulator, accumulator));
     }
 
     @Override
-    public Optional<U> reduce(BinaryOperator<U> accumulator) {
+    public final Optional<U> reduce(BinaryOperator<U> accumulator) {
         return evaluate(ReduceOps.makeRef(accumulator));
     }
 
     @Override
-    public <R> R reduce(R identity, BiFunction<R, ? super U, R> accumulator, BinaryOperator<R> combiner) {
+    public final <R> R reduce(R identity, BiFunction<R, ? super U, R> accumulator, BinaryOperator<R> combiner) {
         return evaluate(ReduceOps.makeRef(identity, accumulator, combiner));
     }
 
     @Override
-    public <R> R collect(Collector<? super U, R> collector) {
+    public final <R> R collect(Collector<? super U, R> collector) {
         return evaluate(ReduceOps.makeRef(collector));
     }
 
     @Override
-    public <R> R collect(Supplier<R> resultFactory, BiConsumer<R, ? super U> accumulator, BiConsumer<R, R> combiner) {
+    public final <R> R collect(Supplier<R> resultFactory, BiConsumer<R, ? super U> accumulator, BiConsumer<R, R> combiner) {
         return evaluate(ReduceOps.makeRef(resultFactory, accumulator, combiner));
     }
 
     @Override
-    public <R> R collectUnordered(Collector<? super U, R> collector) {
+    public final <R> R collectUnordered(Collector<? super U, R> collector) {
         if (collector.isConcurrent()) {
             R container = collector.resultSupplier().get();
             BiFunction<R, ? super U, R> accumulator = collector.accumulator();
@@ -394,18 +452,74 @@
     }
 
     @Override
-    public Optional<U> max(Comparator<? super U> comparator) {
+    public final Optional<U> max(Comparator<? super U> comparator) {
         return reduce(Comparators.greaterOf(comparator));
     }
 
     @Override
-    public Optional<U> min(Comparator<? super U> comparator) {
+    public final Optional<U> min(Comparator<? super U> comparator) {
         return reduce(Comparators.lesserOf(comparator));
 
     }
 
     @Override
-    public long count() {
+    public final long count() {
         return mapToLong(e -> 1L).sum();
     }
+
+
+    //
+
+    static class Head<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> {
+        Head(Supplier<? extends Spliterator<?>> source, int sourceFlags) {
+            super(source, sourceFlags);
+        }
+
+        Head(Spliterator<?> source, int sourceFlags) {
+            super(source, sourceFlags);
+        }
+
+        @Override
+        final boolean opIsStateful() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        final Sink<E_IN> opWrapSink(int flags, Sink<E_OUT> sink) {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    abstract static class StatelessOp<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> {
+        StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
+                    StreamShape inputShape,
+                    int opFlags) {
+            super(upstream, opFlags);
+            assert upstream.getOutputShape() == inputShape;
+        }
+
+        @Override
+        final boolean opIsStateful() {
+            return false;
+        }
+    }
+
+    abstract static class StatefulOp<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> {
+        StatefulOp(AbstractPipeline<?, E_IN, ?> upstream,
+                   StreamShape inputShape,
+                   int opFlags) {
+            super(upstream, opFlags);
+            assert upstream.getOutputShape() == inputShape;
+        }
+
+        @Override
+        final boolean opIsStateful() {
+            return true;
+        }
+
+        @Override
+        abstract <P_IN> Node<E_OUT> opEvaluateParallel(PipelineHelper<E_OUT> helper,
+                                                       Spliterator<P_IN> spliterator,
+                                                       IntFunction<E_OUT[]> generator);
+    }
 }
--- a/src/share/classes/java/util/stream/SliceOp.java	Wed Apr 03 11:29:23 2013 -0700
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,460 +0,0 @@
-/*
- * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.  Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-package java.util.stream;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Spliterator;
-import java.util.concurrent.CountedCompleter;
-import java.util.function.IntFunction;
-
-/**
- * A {@link StatefulOp} that transforms a stream into a subsequence of itself.
- * @param <T> The input and output type of the stream pipeline
- * @since 1.8
- */
-final class SliceOp<T> implements StatefulOp<T> {
-    /** The number of elements to skip */
-    private final long skip;
-
-    /** The maximum size of the resulting stream, or -1 if no limit is to be imposed */
-    private final long limit;
-
-    /** The stream shape of the input and output streams */
-    private final StreamShape shape;
-
-    /** The sink transform for this operation */
-    private final AbstractPipeline.SinkWrapper sinkWrapper;
-
-    /**
-     * Construct a {@code SliceOp}, which may be a skip-only, limit-only, or skip-and-limit.
-     * @param skip The number of elements to skip.  Must be >= 0.
-     * @param limit The maximum size of the resulting stream, or -1 if no limit is to be imposed
-     * @param shape The stream shape of the input and output streams
-     */
-    public SliceOp(long skip, long limit, StreamShape shape) {
-        if (skip < 0)
-            throw new IllegalArgumentException("Skip must be non-negative: " + skip);
-        this.skip = skip;
-        this.limit = limit;
-        this.shape = shape;
-        switch (shape) {
-            case REFERENCE: sinkWrapper = this::wrapSinkRef; break;
-            case INT_VALUE: sinkWrapper = this::wrapSinkInt; break;
-            case LONG_VALUE: sinkWrapper = this::wrapSinkLong; break;
-            case DOUBLE_VALUE: sinkWrapper = this::wrapSinkDouble; break;
-            default:
-                throw new IllegalStateException("Unknown shape: " + shape);
-        }
-    }
-
-    /** Sink transform function for reference streams */
-    private<T> Sink<T> wrapSinkRef(int flags, Sink sink) {
-        return new Sink.ChainedReference<T>(sink) {
-            long n = skip;
-            long m = limit >= 0 ? limit : Long.MAX_VALUE;
-
-            @Override
-            public void accept(T t) {
-                if (n == 0) {
-                    if (m > 0) {
-                        m--;
-                        downstream.accept(t);
-                    }
-                }
-                else {
-                    n--;
-                }
-            }
-
-            @Override
-            public boolean cancellationRequested() {
-                return m == 0 || downstream.cancellationRequested();
-            }
-        };
-    }
-
-    /** Sink transform function for int streams */
-    private Sink<Integer> wrapSinkInt(int flags, Sink sink) {
-        return new Sink.ChainedInt(sink) {
-            long n = skip;
-            long m = limit >= 0 ? limit : Long.MAX_VALUE;
-
-            @Override
-            public void accept(int t) {
-                if (n == 0) {
-                    if (m > 0) {
-                        m--;
-                        downstream.accept(t);
-                    }
-                }
-                else {
-                    n--;
-                }
-            }
-
-            @Override
-            public boolean cancellationRequested() {
-                return m == 0 || downstream.cancellationRequested();
-            }
-        };
-    }
-
-    /** Sink transform function for long streams */
-    private Sink<Long> wrapSinkLong(int flags, Sink sink) {
-        return new Sink.ChainedLong(sink) {
-            long n = skip;
-            long m = limit >= 0 ? limit : Long.MAX_VALUE;
-
-            @Override
-            public void accept(long t) {
-                if (n == 0) {
-                    if (m > 0) {
-                        m--;
-                        downstream.accept(t);
-                    }
-                }
-                else {
-                    n--;
-                }
-            }
-
-            @Override
-            public boolean cancellationRequested() {
-                return m == 0 || downstream.cancellationRequested();
-            }
-        };
-    }
-
-    /** Sink transform function for double streams */
-    private Sink<Double> wrapSinkDouble(int flags, Sink sink) {
-        return new Sink.ChainedDouble(sink) {
-            long n = skip;
-            long m = limit >= 0 ? limit : Long.MAX_VALUE;
-
-            @Override
-            public void accept(double t) {
-                if (n == 0) {
-                    if (m > 0) {
-                        m--;
-                        downstream.accept(t);
-                    }
-                }
-                else {
-                    n--;
-                }
-            }
-
-            @Override
-            public boolean cancellationRequested() {
-                return m == 0 || downstream.cancellationRequested();
-            }
-        };
-    }
-
-    @Override
-    public StreamShape outputShape() {
-        return shape;
-    }
-
-    @Override
-    public StreamShape inputShape() {
-        return shape;
-    }
-
-    @Override
-    public int getOpFlags() {
-        return StreamOpFlag.NOT_SIZED | ((limit != -1) ? StreamOpFlag.IS_SHORT_CIRCUIT : 0);
-    }
-
-    @Override
-    public Sink<T> wrapSink(int flags, Sink sink) {
-        return sinkWrapper.wrapSink(flags, sink);
-    }
-
-//    private long getFinalSize(PipelineHelper helper, Spliterator spliterator) {
-//        long size = helper.exactOutputSizeIfKnown(spliterator);
-//        if (size >= 0) {
-//            size = Math.max(0, size - skip);
-//            if (limit >= 0)
-//                size = Math.min(size, limit);
-//        }
-//        return size;
-//    }
-
-    @Override
-    public <S> Node<T> evaluateParallel(PipelineHelper<S, T> helper, Spliterator<S> spliterator, IntFunction<T[]> generator) {
-        // Parallel strategy -- two cases
-        // IF we have full size information
-        // - decompose, keeping track of each leaf's (offset, size)
-        // - calculate leaf only if intersection between (offset, size) and desired slice
-        // - Construct a Node containing the appropriate sections of the appropriate leaves
-        // IF we don't
-        // - decompose, and calculate size of each leaf
-        // - on complete of any node, compute completed initial size from the root, and if big enough, cancel later nodes
-        // - @@@ this can be significantly improved
-
-        // @@@ Currently we don't do the sized version at all
-
-        // @@@ Should take into account ORDERED flag; if not ORDERED, we can limit in temporal order instead
-
-//        Spliterator<S> spliterator = helper.spliterator();
-//        int size = spliterator.getSizeIfKnown();
-//        if (size >= 0 && helper.getOutputSizeIfKnown() == size && spliterator.isPredictableSplits())
-//            return helper.invoke(new SizedSliceTask<>(helper, skip, getFinalSize(helper, spliterator)));
-//        else
-              return new SliceTask<>(this, helper, spliterator, generator, skip, limit).invoke();
-    }
-
-    @Override
-    public String toString() {
-        return String.format("SliceOp[skip=%d,limit=%d]", skip, limit);
-    }
-
-    /**
-     * {@code ForkJoinTask} implementing slice computation
-     *
-     * @param <S> Input element type to the stream pipeline
-     * @param <T> Output element type from the stream pipeline
-     */
-    private static final class SliceTask<S, T> extends AbstractShortCircuitTask<S, T, Node<T>, SliceTask<S, T>> {
-        private final SliceOp<T> op;
-        private final IntFunction<T[]> generator;
-        private final long targetOffset, targetSize;
-        private long thisNodeSize;
-
-        private volatile boolean completed;
-
-        SliceTask(SliceOp<T> op, PipelineHelper<S, T> helper, Spliterator<S> spliterator,
-                  IntFunction<T[]> generator, long offset, long size) {
-            super(helper, spliterator);
-            this.op = op;
-            this.generator = generator;
-            this.targetOffset = offset;
-            this.targetSize = size;
-        }
-
-        SliceTask(SliceTask<S, T> parent, Spliterator<S> spliterator) {
-            super(parent, spliterator);
-            this.op = parent.op;
-            this.generator = parent.generator;
-            this.targetOffset = parent.targetOffset;
-            this.targetSize = parent.targetSize;
-        }
-
-        @Override
-        protected SliceTask<S, T> makeChild(Spliterator<S> spliterator) {
-            return new SliceTask<>(this, spliterator);
-        }
-
-        @Override
-        protected final Node<T> getEmptyResult() {
-            return Nodes.createEmptyNode(op.shape);
-        }
-
-        @Override
-        protected final Node<T> doLeaf() {
-            if (isRoot()) {
-                return helper.evaluateSequential(op, spliterator, generator);
-            }
-            else {
-                Node<T> node = helper.wrapAndCopyInto(helper.makeNodeBuilder(-1, generator),
-                                                      spliterator).build();
-                thisNodeSize = node.count();
-                completed = true;
-                return node;
-            }
-        }
-
-        @Override
-        public final void onCompletion(CountedCompleter<?> caller) {
-            if (!isLeaf()) {
-                thisNodeSize = 0;
-                for (SliceTask<S, T> child = children; child != null; child = child.nextSibling)
-                    thisNodeSize += child.thisNodeSize;
-                completed = true;
-
-                if (isRoot()) {
-                    // Only collect nodes once absolute size information is known
-
-                    ArrayList<Node<T>> nodes = new ArrayList<>();
-                    visit(nodes, 0);
-                    Node<T> result;
-                    if (nodes.size() == 0)
-                        result = Nodes.createEmptyNode(op.shape);
-                    else if (nodes.size() == 1)
-                        result = nodes.get(0);
-                    else
-                        // This will create a tree of depth 1 and will not be a sub-tree
-                        // for leaf nodes within the require range
-                        result = Nodes.createConcNode(op.shape, nodes);
-                    setLocalResult(result);
-                }
-            }
-            if (targetSize >= 0) {
-                if (((SliceTask<S,T>) getRoot()).leftSize() >= targetOffset + targetSize)
-                    cancelLaterNodes();
-            }
-            // Don't call super.onCompletion(), we don't look at the child nodes until farther up the tree
-        }
-
-        private long leftSize() {
-            if (completed)
-                return thisNodeSize;
-            else if (isLeaf())
-                return 0;
-            else {
-                long leftSize = 0;
-                for (SliceTask<S, T> child = children; child != null; child = child.nextSibling) {
-                    if (child.completed)
-                        leftSize += child.thisNodeSize;
-                    else {
-                        leftSize += child.leftSize();
-                        break;
-                    }
-                }
-                return leftSize;
-            }
-        }
-
-        private void visit(List<Node<T>> results, int offset) {
-            if (!isLeaf()) {
-                for (SliceTask<S, T> child = children; child != null; child = child.nextSibling) {
-                    child.visit(results, offset);
-                    offset += child.thisNodeSize;
-                }
-            }
-            else {
-                if (results.size() == 0) {
-                    if (offset + thisNodeSize >= targetOffset)
-                        results.add(truncateNode(getLocalResult(),
-                                                 Math.max(0, targetOffset - offset),
-                                                 targetSize >= 0 ? Math.max(0, offset + thisNodeSize - (targetOffset + targetSize)) : 0));
-                }
-                else {
-                    if (targetSize == -1 || offset < targetOffset + targetSize) {
-                        results.add(truncateNode(getLocalResult(),
-                                                 0,
-                                                 targetSize >= 0 ? Math.max(0, offset + thisNodeSize - (targetOffset + targetSize)) : 0));
-                    }
-                }
-            }
-        }
-
-        private Node<T> truncateNode(Node<T> input, long skipLeft, long skipRight) {
-            if (skipLeft == 0 && skipRight == 0)
-                return input;
-            else {
-                return Nodes.createTruncatedNode(input, skipLeft, thisNodeSize - skipRight, generator);
-            }
-        }
-    }
-
-    // @@@ Currently unused -- optimization for when all sizes are known
-//    private static class SizedSliceTask<S, T> extends AbstractShortCircuitTask<S, T, Node<T>, SizedSliceTask<S, T>> {
-//        private final int targetOffset, targetSize;
-//        private final int offset, size;
-//
-//        private SizedSliceTask(ParallelPipelineHelper<S, T> helper, int offset, int size) {
-//            super(helper);
-//            targetOffset = offset;
-//            targetSize = size;
-//            this.offset = 0;
-//            this.size = spliterator.getSizeIfKnown();
-//        }
-//
-//        private SizedSliceTask(SizedSliceTask<S, T> parent, Spliterator<S> spliterator) {
-//            // Makes assumptions about order in which siblings are created and linked into parent!
-//            super(parent, spliterator);
-//            targetOffset = parent.targetOffset;
-//            targetSize = parent.targetSize;
-//            int siblingSizes = 0;
-//            for (SizedSliceTask<S, T> sibling = parent.children; sibling != null; sibling = sibling.nextSibling)
-//                siblingSizes += sibling.size;
-//            size = spliterator.getSizeIfKnown();
-//            offset = parent.offset + siblingSizes;
-//        }
-//
-//        @Override
-//        protected SizedSliceTask<S, T> makeChild(Spliterator<S> spliterator) {
-//            return new SizedSliceTask<>(this, spliterator);
-//        }
-//
-//        @Override
-//        protected Node<T> getEmptyResult() {
-//            return Nodes.emptyNode();
-//        }
-//
-//        @Override
-//        public boolean taskCancelled() {
-//            if (offset > targetOffset+targetSize || offset+size < targetOffset)
-//                return true;
-//            else
-//                return super.taskCancelled();
-//        }
-//
-//        @Override
-//        protected Node<T> doLeaf() {
-//            int skipLeft = Math.max(0, targetOffset - offset);
-//            int skipRight = Math.max(0, offset + size - (targetOffset + targetSize));
-//            if (skipLeft == 0 && skipRight == 0)
-//                return helper.into(Nodes.<T>makeBuilder(spliterator.getSizeIfKnown())).build();
-//            else {
-//                // If we're the first or last node that intersects the target range, peel off irrelevant elements
-//                int truncatedSize = size - skipLeft - skipRight;
-//                NodeBuilder<T> builder = Nodes.<T>makeBuilder(truncatedSize);
-//                Sink<S> wrappedSink = helper.wrapSink(builder);
-//                wrappedSink.begin(truncatedSize);
-//                Iterator<S> iterator = spliterator.iterator();
-//                for (int i=0; i<skipLeft; i++)
-//                    iterator.next();
-//                for (int i=0; i<truncatedSize; i++)
-//                    wrappedSink.apply(iterator.next());
-//                wrappedSink.end();
-//                return builder.build();
-//            }
-//        }
-//
-//        @Override
-//        public void onCompletion(CountedCompleter<?> caller) {
-//            if (!isLeaf()) {
-//                Node<T> result = null;
-//                for (SizedSliceTask<S, T> child = children.nextSibling; child != null; child = child.nextSibling) {
-//                    Node<T> childResult = child.getRawResult();
-//                    if (childResult == null)
-//                        continue;
-//                    else if (result == null)
-//                        result = childResult;
-//                    else
-//                        result = Nodes.node(result, childResult);
-//                }
-//                setRawResult(result);
-//                if (offset <= targetOffset && offset+size >= targetOffset+targetSize)
-//                    shortCircuit(result);
-//            }
-//        }
-//    }
-
-}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/share/classes/java/util/stream/SliceOps.java	Wed Apr 03 21:51:55 2013 +0200
@@ -0,0 +1,494 @@
+/*
+ * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package java.util.stream;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Spliterator;
+import java.util.concurrent.CountedCompleter;
+import java.util.function.IntFunction;
+
+/**
+ * Factory methods for transforming a stream into a subsequence of itself.
+ * @since 1.8
+ */
+final class SliceOps {
+
+    private SliceOps() { }
+
+    /**
+     * Appends a "slice" operation to the provided stream.  The slice operation
+     * may be may be skip-only, limit-only, or skip-and-limit.
+     *
+     * @param <T> The type of both input and output elements
+     * @param upstream A reference stream with element type T
+     * @param skip The number of elements to skip.  Must be >= 0.
+     * @param limit The maximum size of the resulting stream, or -1 if no limit
+     *        is to be imposed
+     */
+    public static<T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream, long skip, long limit) {
+        if (skip < 0)
+            throw new IllegalArgumentException("Skip must be non-negative: " + skip);
+
+        return new ReferencePipeline.StatefulOp<T,T>(upstream, StreamShape.REFERENCE,
+                                                     flags(limit)) {
+            @Override
+            <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,
+                                              Spliterator<P_IN> spliterator,
+                                              IntFunction<T[]> generator) {
+                return new SliceTask<>(this, helper, spliterator, generator, skip, limit).invoke();
+            }
+
+            @Override
+            Sink<T> opWrapSink(int flags, Sink<T> sink) {
+                return new Sink.ChainedReference<T>(sink) {
+                    long n = skip;
+                    long m = limit >= 0 ? limit : Long.MAX_VALUE;
+
+                    @Override
+                    public void accept(T t) {
+                        if (n == 0) {
+                            if (m > 0) {
+                                m--;
+                                downstream.accept(t);
+                            }
+                        }
+                        else {
+                            n--;
+                        }
+                    }
+
+                    @Override
+                    public boolean cancellationRequested() {
+                        return m == 0 || downstream.cancellationRequested();
+                    }
+                };
+            }
+        };
+    }
+
+    /**
+     * Appends a "slice" operation to the provided IntStream.  The slice
+     * operation may be may be skip-only, limit-only, or skip-and-limit.
+     *
+     * @param upstream An IntStream
+     * @param skip The number of elements to skip.  Must be >= 0.
+     * @param limit The maximum size of the resulting stream, or -1 if no limit
+     *        is to be imposed
+     */
+    public static IntStream makeInt(AbstractPipeline<?, Integer, ?> upstream, long skip, long limit) {
+        if (skip < 0)
+            throw new IllegalArgumentException("Skip must be non-negative: " + skip);
+
+        return new IntPipeline.StatefulOp<Integer>(upstream, StreamShape.INT_VALUE,
+                                                   flags(limit)) {
+            @Override
+            <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper,
+                                                    Spliterator<P_IN> spliterator,
+                                                    IntFunction<Integer[]> generator) {
+                return new SliceTask<>(this, helper, spliterator, generator, skip, limit).invoke();
+            }
+
+            @Override
+            Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
+                return new Sink.ChainedInt(sink) {
+                    long n = skip;
+                    long m = limit >= 0 ? limit : Long.MAX_VALUE;
+
+                    @Override
+                    public void accept(int t) {
+                        if (n == 0) {
+                            if (m > 0) {
+                                m--;
+                                downstream.accept(t);
+                            }
+                        }
+                        else {
+                            n--;
+                        }
+                    }
+
+                    @Override
+                    public boolean cancellationRequested() {
+                        return m == 0 || downstream.cancellationRequested();
+                    }
+                };
+            }
+        };
+    }
+
+    /**
+     * Appends a "slice" operation to the provided LongStream.  The slice
+     * operation may be may be skip-only, limit-only, or skip-and-limit.
+     *
+     * @param upstream A LongStream
+     * @param skip The number of elements to skip.  Must be >= 0.
+     * @param limit The maximum size of the resulting stream, or -1 if no limit
+     *        is to be imposed
+     */
+    public static LongStream makeLong(AbstractPipeline<?, Long, ?> upstream, long skip, long limit) {
+        if (skip < 0)
+            throw new IllegalArgumentException("Skip must be non-negative: " + skip);
+
+        return new LongPipeline.StatefulOp<Long>(upstream, StreamShape.LONG_VALUE,
+                                                 flags(limit)) {
+            @Override
+            <P_IN> Node<Long> opEvaluateParallel(PipelineHelper<Long> helper,
+                                                 Spliterator<P_IN> spliterator,
+                                                 IntFunction<Long[]> generator) {
+                return new SliceTask<>(this, helper, spliterator, generator, skip, limit).invoke();
+            }
+
+            @Override
+            Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
+                return new Sink.ChainedLong(sink) {
+                    long n = skip;
+                    long m = limit >= 0 ? limit : Long.MAX_VALUE;
+
+                    @Override
+                    public void accept(long t) {
+                        if (n == 0) {
+                            if (m > 0) {
+                                m--;
+                                downstream.accept(t);
+                            }
+                        }
+                        else {
+                            n--;
+                        }
+                    }
+
+                    @Override
+                    public boolean cancellationRequested() {
+                        return m == 0 || downstream.cancellationRequested();
+                    }
+                };
+            }
+        };
+    }
+
+    /**
+     * Appends a "slice" operation to the provided DoubleStream.  The slice
+     * operation may be may be skip-only, limit-only, or skip-and-limit.
+     *
+     * @param upstream A DoubleStream
+     * @param skip The number of elements to skip.  Must be >= 0.
+     * @param limit The maximum size of the resulting stream, or -1 if no limit
+     *        is to be imposed
+     */
+    public static DoubleStream makeDouble(AbstractPipeline<?, Double, ?> upstream, long skip, long limit) {
+        if (skip < 0)
+            throw new IllegalArgumentException("Skip must be non-negative: " + skip);
+
+        return new DoublePipeline.StatefulOp<Double>(upstream, StreamShape.DOUBLE_VALUE,
+                                                     flags(limit)) {
+            @Override
+            <P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper,
+                                                   Spliterator<P_IN> spliterator,
+                                                   IntFunction<Double[]> generator) {
+                return new SliceTask<>(this, helper, spliterator, generator, skip, limit).invoke();
+            }
+
+            @Override
+            Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
+                return new Sink.ChainedDouble(sink) {
+                    long n = skip;
+                    long m = limit >= 0 ? limit : Long.MAX_VALUE;
+
+                    @Override
+                    public void accept(double t) {
+                        if (n == 0) {
+                            if (m > 0) {
+                                m--;
+                                downstream.accept(t);
+                            }
+                        }
+                        else {
+                            n--;
+                        }
+                    }
+
+                    @Override
+                    public boolean cancellationRequested() {
+                        return m == 0 || downstream.cancellationRequested();
+                    }
+                };
+            }
+        };
+    }
+
+    private static int flags(long limit) {
+        return StreamOpFlag.NOT_SIZED | ((limit != -1) ? StreamOpFlag.IS_SHORT_CIRCUIT : 0);
+    }
+
+    // Parallel strategy -- two cases
+    // IF we have full size information
+    // - decompose, keeping track of each leaf's (offset, size)
+    // - calculate leaf only if intersection between (offset, size) and desired slice
+    // - Construct a Node containing the appropriate sections of the appropriate leaves
+    // IF we don't
+    // - decompose, and calculate size of each leaf
+    // - on complete of any node, compute completed initial size from the root, and if big enough, cancel later nodes
+    // - @@@ this can be significantly improved
+
+    // @@@ Currently we don't do the sized version at all
+
+    // @@@ Should take into account ORDERED flag; if not ORDERED, we can limit in temporal order instead
+
+    /**
+     * {@code ForkJoinTask} implementing slice computation
+     *
+     * @param <S> Input element type to the stream pipeline
+     * @param <T> Output element type from the stream pipeline
+     */
+    private static final class SliceTask<S, T> extends AbstractShortCircuitTask<S, T, Node<T>, SliceTask<S, T>> {
+        private final AbstractPipeline<T, T, ?> op;
+        private final IntFunction<T[]> generator;
+        private final long targetOffset, targetSize;
+        private long thisNodeSize;
+
+        private volatile boolean completed;
+
+        SliceTask(AbstractPipeline<?, T, ?> op, PipelineHelper<T> helper, Spliterator<S> spliterator,
+                  IntFunction<T[]> generator, long offset, long size) {
+            super(helper, spliterator);
+            this.op = (AbstractPipeline<T, T, ?>) op;
+            this.generator = generator;
+            this.targetOffset = offset;
+            this.targetSize = size;
+        }
+
+        SliceTask(SliceTask<S, T> parent, Spliterator<S> spliterator) {
+            super(parent, spliterator);
+            this.op = parent.op;
+            this.generator = parent.generator;
+            this.targetOffset = parent.targetOffset;
+            this.targetSize = parent.targetSize;
+        }
+
+        @Override
+        protected SliceTask<S, T> makeChild(Spliterator<S> spliterator) {
+            return new SliceTask<>(this, spliterator);
+        }
+
+        @Override
+        protected final Node<T> getEmptyResult() {
+            return Nodes.createEmptyNode(op.getOutputShape());
+        }
+
+        @Override
+        protected final Node<T> doLeaf() {
+            if (isRoot()) {
+                long sizeIfKnown = StreamOpFlag.SIZED.isPreserved(op.sourceOrOpFlags)
+                                   ? op.exactOutputSizeIfKnown(spliterator)
+                                   : -1;
+                final Node.Builder<T> nb = op.makeNodeBuilder(sizeIfKnown, generator);
+                Sink<T> opSink = op.opWrapSink(op.sourceOrOpFlags, nb);
+
+                if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(op.sourceOrOpFlags))
+                    helper.wrapAndCopyInto(opSink, spliterator);
+                else
+                    helper.copyIntoWithCancel(helper.wrapSink(opSink), spliterator);
+                return nb.build();
+            }
+            else {
+                Node<T> node = helper.wrapAndCopyInto(helper.makeNodeBuilder(-1, generator),
+                                                      spliterator).build();
+                thisNodeSize = node.count();
+                completed = true;
+                return node;
+            }
+        }
+
+        @Override
+        public final void onCompletion(CountedCompleter<?> caller) {
+            if (!isLeaf()) {
+                thisNodeSize = 0;
+                for (SliceTask<S, T> child = children; child != null; child = child.nextSibling)
+                    thisNodeSize += child.thisNodeSize;
+                completed = true;
+
+                if (isRoot()) {
+                    // Only collect nodes once absolute size information is known
+
+                    ArrayList<Node<T>> nodes = new ArrayList<>();
+                    visit(nodes, 0);
+                    Node<T> result;
+                    if (nodes.size() == 0)
+                        result = Nodes.createEmptyNode(op.getOutputShape());
+                    else if (nodes.size() == 1)
+                        result = nodes.get(0);
+                    else
+                        // This will create a tree of depth 1 and will not be a sub-tree
+                        // for leaf nodes within the require range
+                        result = Nodes.createConcNode(op.getOutputShape(), nodes);
+                    setLocalResult(result);
+                }
+            }
+            if (targetSize >= 0) {
+                if (((SliceTask<S,T>) getRoot()).leftSize() >= targetOffset + targetSize)
+                    cancelLaterNodes();
+            }
+            // Don't call super.onCompletion(), we don't look at the child nodes until farther up the tree
+        }
+
+        private long leftSize() {
+            if (completed)
+                return thisNodeSize;
+            else if (isLeaf())
+                return 0;
+            else {
+                long leftSize = 0;
+                for (SliceTask<S, T> child = children; child != null; child = child.nextSibling) {
+                    if (child.completed)
+                        leftSize += child.thisNodeSize;
+                    else {
+                        leftSize += child.leftSize();
+                        break;
+                    }
+                }
+                return leftSize;
+            }
+        }
+
+        private void visit(List<Node<T>> results, int offset) {
+            if (!isLeaf()) {
+                for (SliceTask<S, T> child = children; child != null; child = child.nextSibling) {
+                    child.visit(results, offset);
+                    offset += child.thisNodeSize;
+                }
+            }
+            else {
+                if (results.size() == 0) {
+                    if (offset + thisNodeSize >= targetOffset)
+                        results.add(truncateNode(getLocalResult(),
+                                                 Math.max(0, targetOffset - offset),
+                                                 targetSize >= 0 ? Math.max(0, offset + thisNodeSize - (targetOffset + targetSize)) : 0));
+                }
+                else {
+                    if (targetSize == -1 || offset < targetOffset + targetSize) {
+                        results.add(truncateNode(getLocalResult(),
+                                                 0,
+                                                 targetSize >= 0 ? Math.max(0, offset + thisNodeSize - (targetOffset + targetSize)) : 0));
+                    }
+                }
+            }
+        }
+
+        private Node<T> truncateNode(Node<T> input, long skipLeft, long skipRight) {
+            if (skipLeft == 0 && skipRight == 0)
+                return input;
+            else {
+                return Nodes.createTruncatedNode(input, skipLeft, thisNodeSize - skipRight, generator);
+            }
+        }
+    }
+
+    // @@@ Currently unused -- optimization for when all sizes are known
+//    private static class SizedSliceTask<S, T> extends AbstractShortCircuitTask<S, T, Node<T>, SizedSliceTask<S, T>> {
+//        private final int targetOffset, targetSize;
+//        private final int offset, size;
+//
+//        private SizedSliceTask(ParallelPipelineHelper<S, T> helper, int offset, int size) {
+//            super(helper);
+//            targetOffset = offset;
+//            targetSize = size;
+//            this.offset = 0;
+//            this.size = spliterator.getSizeIfKnown();
+//        }
+//
+//        private SizedSliceTask(SizedSliceTask<S, T> parent, Spliterator<S> spliterator) {
+//            // Makes assumptions about order in which siblings are created and linked into parent!
+//            super(parent, spliterator);
+//            targetOffset = parent.targetOffset;
+//            targetSize = parent.targetSize;
+//            int siblingSizes = 0;
+//            for (SizedSliceTask<S, T> sibling = parent.children; sibling != null; sibling = sibling.nextSibling)
+//                siblingSizes += sibling.size;
+//            size = spliterator.getSizeIfKnown();
+//            offset = parent.offset + siblingSizes;
+//        }
+//
+//        @Override
+//        protected SizedSliceTask<S, T> makeChild(Spliterator<S> spliterator) {
+//            return new SizedSliceTask<>(this, spliterator);
+//        }
+//
+//        @Override
+//        protected Node<T> getEmptyResult() {
+//            return Nodes.emptyNode();
+//        }
+//
+//        @Override
+//        public boolean taskCancelled() {
+//            if (offset > targetOffset+targetSize || offset+size < targetOffset)
+//                return true;
+//            else
+//                return super.taskCancelled();
+//        }
+//
+//        @Override
+//        protected Node<T> doLeaf() {
+//            int skipLeft = Math.max(0, targetOffset - offset);
+//            int skipRight = Math.max(0, offset + size - (targetOffset + targetSize));
+//            if (skipLeft == 0 && skipRight == 0)
+//                return helper.into(Nodes.<T>makeBuilder(spliterator.getSizeIfKnown())).build();
+//            else {
+//                // If we're the first or last node that intersects the target range, peel off irrelevant elements
+//                int truncatedSize = size - skipLeft - skipRight;
+//                NodeBuilder<T> builder = Nodes.<T>makeBuilder(truncatedSize);
+//                Sink<S> wrappedSink = helper.wrapSink(builder);
+//                wrappedSink.begin(truncatedSize);
+//                Iterator<S> iterator = spliterator.iterator();
+//                for (int i=0; i<skipLeft; i++)
+//                    iterator.next();
+//                for (int i=0; i<truncatedSize; i++)
+//                    wrappedSink.apply(iterator.next());
+//                wrappedSink.end();
+//                return builder.build();
+//            }
+//        }
+//
+//        @Override
+//        public void onCompletion(CountedCompleter<?> caller) {
+//            if (!isLeaf()) {
+//                Node<T> result = null;
+//                for (SizedSliceTask<S, T> child = children.nextSibling; child != null; child = child.nextSibling) {
+//                    Node<T> childResult = child.getRawResult();
+//                    if (childResult == null)
+//                        continue;
+//                    else if (result == null)
+//                        result = childResult;
+//                    else
+//                        result = Nodes.node(result, childResult);
+//                }
+//                setRawResult(result);
+//                if (offset <= targetOffset && offset+size >= targetOffset+targetSize)
+//                    shortCircuit(result);
+//            }
+//        }
+//    }
+
+}
--- a/src/share/classes/java/util/stream/SortedOp.java	Wed Apr 03 11:29:23 2013 -0700
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,491 +0,0 @@
-/*
- * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.  Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-package java.util.stream;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.Comparators;
-import java.util.Objects;
-import java.util.Spliterator;
-import java.util.concurrent.ForkJoinTask;
-import java.util.function.IntFunction;
-
-
-/**
- * A {@link StatefulOp} that sorts the elements of stream.
- *
- * @param <T> Type of elements to be sorted.
- *
- * @since 1.8
- */
-abstract class SortedOp<T> implements StatefulOp<T> {
-    /** The stream shape of the input and output stream */
-    private final StreamShape shape;
-
-    protected SortedOp(StreamShape shape) {
-        this.shape = shape;
-    }
-
-    /** Specialized subtype for sorting reference streams */
-    static final class OfRef<T> extends SortedOp<T> {
-        /** Comparator used for sorting */
-        private final boolean isNaturalSort;
-        private final Comparator<? super T> comparator;
-
-        /**
-         * Sort using natural order of {@literal <T>} which must be
-         * {@code Comparable}.
-         */
-        OfRef() {
-            super(StreamShape.REFERENCE);
-            this.isNaturalSort = true;
-            // Will throw CCE when we try to sort if T is not Comparable
-            this.comparator = (Comparator<? super T>) Comparators.naturalOrder();
-        }
-
-        /**
-         * Sort using the provided comparator.
-         *
-         * @param comparator The comparator to be used to evaluate ordering.
-         */
-        OfRef(Comparator<? super T> comparator) {
-            super(StreamShape.REFERENCE);
-            this.isNaturalSort = false;
-            this.comparator = Objects.requireNonNull(comparator);
-        }
-
-        @Override
-        public int getOpFlags() {
-            return StreamOpFlag.IS_ORDERED |
-                   (isNaturalSort ? StreamOpFlag.IS_SORTED : StreamOpFlag.NOT_SORTED);
-        }
-
-        @Override
-        public Sink<T> wrapSink(int flags, Sink sink) {
-            Objects.requireNonNull(sink);
-
-            // If the input is already naturally sorted and this operation
-            // also naturally sorted then this is a no-op
-            if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort)
-                return sink;
-            else if (StreamOpFlag.SIZED.isKnown(flags))
-                return new SizedRefSortingSink<>(sink, comparator);
-            else
-                return new RefSortingSink<>(sink, comparator);
-        }
-
-        @Override
-        public <P_IN> Node<T> evaluateParallel(PipelineHelper<P_IN, T> helper,
-                                               Spliterator<P_IN> spliterator,
-                                               IntFunction<T[]> generator) {
-            // If the input is already naturally sorted and this operation
-            // naturally sorts then collect the output
-            if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags()) && isNaturalSort) {
-                return helper.evaluate(spliterator, false, generator);
-            }
-            else {
-                // @@@ Weak two-pass parallel implementation; parallel collect, parallel sort
-                T[] flattenedData = helper.evaluate(spliterator, true, generator).asArray(generator);
-                Arrays.parallelSort(flattenedData, comparator);
-                return Nodes.node(flattenedData);
-            }
-        }
-    }
-
-    /** Specialized subtype for sorting int streams */
-    static final class OfInt extends SortedOp<Integer> {
-        OfInt() {
-            super(StreamShape.INT_VALUE);
-        }
-
-        @Override
-        public Sink<Integer> wrapSink(int flags, Sink sink) {
-            Objects.requireNonNull(sink);
-
-            if (StreamOpFlag.SORTED.isKnown(flags))
-                return sink;
-            else if (StreamOpFlag.SIZED.isKnown(flags))
-                return new SizedIntSortingSink(sink);
-            else
-                return new IntSortingSink(sink);
-        }
-
-        @Override
-        public <P_IN> Node<Integer> evaluateParallel(PipelineHelper<P_IN, Integer> helper,
-                                                     Spliterator<P_IN> spliterator,
-                                                     IntFunction<Integer[]> generator) {
-            if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) {
-                return helper.evaluate(spliterator, false, generator);
-            }
-            else {
-                Node.OfInt n = (Node.OfInt) helper.evaluate(spliterator, true, generator);
-
-                int[] content = n.asIntArray();
-                Arrays.parallelSort(content);
-
-                return Nodes.intNode(content);
-            }
-        }
-    }
-
-    /** Specialized subtype for sorting long streams */
-    static final class OfLong extends SortedOp<Long> {
-        OfLong() {
-            super(StreamShape.LONG_VALUE);
-        }
-
-        @Override
-        public Sink<Long> wrapSink(int flags, Sink sink) {
-            Objects.requireNonNull(sink);
-
-            if (StreamOpFlag.SORTED.isKnown(flags))
-                return sink;
-            else if (StreamOpFlag.SIZED.isKnown(flags))
-                return new SizedLongSortingSink(sink);
-            else
-                return new LongSortingSink(sink);
-        }
-
-        @Override
-        public <P_IN> Node<Long> evaluateParallel(PipelineHelper<P_IN, Long> helper,
-                                                  Spliterator<P_IN> spliterator,
-                                                  IntFunction<Long[]> generator) {
-            if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) {
-                return helper.evaluate(spliterator, false, generator);
-            }
-            else {
-                Node.OfLong n = (Node.OfLong) helper.evaluate(spliterator, true, generator);
-
-                long[] content = n.asLongArray();
-                Arrays.parallelSort(content);
-
-                return Nodes.longNode(content);
-            }
-        }
-    }
-
-    /** Specialized subtype for sorting double streams */
-    static final class OfDouble extends SortedOp<Double> {
-        OfDouble() {
-            super(StreamShape.DOUBLE_VALUE);
-        }
-
-        @Override
-        public Sink<Double> wrapSink(int flags, Sink sink) {
-            Objects.requireNonNull(sink);
-
-            if (StreamOpFlag.SORTED.isKnown(flags))
-                return sink;
-            else if (StreamOpFlag.SIZED.isKnown(flags))
-                return new SizedDoubleSortingSink(sink);
-            else
-                return new DoubleSortingSink(sink);
-        }
-
-        @Override
-        public <P_IN> Node<Double> evaluateParallel(PipelineHelper<P_IN, Double> helper,
-                                                    Spliterator<P_IN> spliterator,
-                                                    IntFunction<Double[]> generator) {
-            if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) {
-                return helper.evaluate(spliterator, false, generator);
-            }
-            else {
-                Node.OfDouble n = (Node.OfDouble) helper.evaluate(spliterator, true, generator);
-
-                double[] content = n.asDoubleArray();
-                Arrays.parallelSort(content);
-
-                return Nodes.doubleNode(content);
-            }
-        }
-    }
-
-    @Override
-    public int getOpFlags() {
-        return StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED;
-    }
-
-    @Override
-    public StreamShape outputShape() {
-        return shape;
-    }
-
-    @Override
-    public StreamShape inputShape() {
-        return shape;
-    }
-
-    /** {@link ForkJoinTask} for implementing sort on SIZED reference streams */
-    private static final class SizedRefSortingSink<T> extends Sink.ChainedReference<T> {
-        private final Comparator<? super T> comparator;
-        private T[] array;
-        private int offset;
-
-        SizedRefSortingSink(Sink sink, Comparator<? super T> comparator) {
-            super(sink);
-            this.comparator = comparator;
-        }
-
-        @Override
-        public void begin(long size) {
-            if (size >= Streams.MAX_ARRAY_SIZE)
-                throw new IllegalArgumentException("Stream size exceeds max array size");
-            array = (T[]) new Object[(int) size];
-        }
-
-        @Override
-        public void end() {
-            // Need to use offset rather than array.length since the downstream
-            // many be short-circuiting
-            // @@@ A better approach is to know if the downstream short-circuits
-            //     and check sink.cancellationRequested
-            Arrays.sort(array, 0, offset, comparator);
-            downstream.begin(offset);
-            for (int i = 0; i < offset; i++)
-                downstream.accept(array[i]);
-            downstream.end();
-            array = null;
-        }
-
-        @Override
-        public void accept(T t) {
-            array[offset++] = t;
-        }
-    }
-
-    /** {@link Sink} for implementing sort on reference streams */
-    private static final class RefSortingSink<T> extends Sink.ChainedReference<T> {
-        private final Comparator<? super T> comparator;
-        private ArrayList<T> list;
-
-        RefSortingSink(Sink sink, Comparator<? super T> comparator) {
-            super(sink);
-            this.comparator = comparator;
-        }
-
-        @Override
-        public void begin(long size) {
-            list = (size >= 0) ? new ArrayList<T>((int) size) : new ArrayList<T>();
-        }
-
-        @Override
-        public void end() {
-            list.sort(comparator);
-            downstream.begin(list.size());
-            list.forEach(downstream::accept);
-            downstream.end();
-            list = null;
-        }
-
-        @Override
-        public void accept(T t) {
-            list.add(t);
-        }
-    }
-
-    /** {@link Sink} for implementing sort on SIZED int streams */
-    private static final class SizedIntSortingSink extends Sink.ChainedInt {
-        private int[] array;
-        private int offset;
-
-        SizedIntSortingSink(Sink downstream) {
-            super(downstream);
-        }
-
-        @Override
-        public void begin(long size) {
-            if (size >= Streams.MAX_ARRAY_SIZE)
-                throw new IllegalArgumentException("Stream size exceeds max array size");
-            array = new int[(int) size];
-        }
-
-        @Override
-        public void end() {
-            Arrays.sort(array, 0, offset);
-            downstream.begin(offset);
-            for (int i = 0; i < offset; i++)
-                downstream.accept(array[i]);
-            downstream.end();
-            array = null;
-        }
-
-        @Override
-        public void accept(int t) {
-            array[offset++] = t;
-        }
-    }
-
-    /** {@link Sink} for implementing sort on int streams */
-    private static final class IntSortingSink extends Sink.ChainedInt {
-        private SpinedBuffer.OfInt b;
-
-        IntSortingSink(Sink sink) {
-            super(sink);
-        }
-
-        @Override
-        public void begin(long size) {
-            b = (size > 0) ? new SpinedBuffer.OfInt((int) size) : new SpinedBuffer.OfInt();
-        }
-
-        @Override
-        public void end() {
-            int[] ints = b.asIntArray();
-            Arrays.sort(ints);
-            downstream.begin(ints.length);
-            for (int anInt : ints)
-                downstream.accept(anInt);
-            downstream.end();
-        }
-
-        @Override
-        public void accept(int t) {
-            b.accept(t);
-        }
-    }
-
-    /** {@link Sink} for implementing sort on SIZED long streams */
-    private static final class SizedLongSortingSink extends Sink.ChainedLong {
-        private long[] array;
-        private int offset;
-
-        SizedLongSortingSink(Sink downstream) {
-            super(downstream);
-        }
-
-        @Override
-        public void begin(long size) {
-            if (size >= Streams.MAX_ARRAY_SIZE)
-                throw new IllegalArgumentException("Stream size exceeds max array size");
-            array = new long[(int) size];
-        }
-
-        @Override
-        public void end() {
-            Arrays.sort(array, 0, offset);
-            downstream.begin(offset);
-            for (int i = 0; i < offset; i++)
-                downstream.accept(array[i]);
-            downstream.end();
-            array = null;
-        }
-
-        @Override
-        public void accept(long t) {
-            array[offset++] = t;
-        }
-    }
-
-    /** {@link Sink} for implementing sort on long streams */
-    private static final class LongSortingSink extends Sink.ChainedLong {
-        private SpinedBuffer.OfLong b;
-
-        LongSortingSink(Sink sink) {
-            super(sink);
-        }
-
-        @Override
-        public void begin(long size) {
-            b = (size > 0) ? new SpinedBuffer.OfLong((int) size) : new SpinedBuffer.OfLong();
-        }
-
-        @Override
-        public void end() {
-            long[] longs = b.asLongArray();
-            Arrays.sort(longs);
-            downstream.begin(longs.length);
-            for (long aLong : longs)
-                downstream.accept(aLong);
-            downstream.end();
-        }
-
-        @Override
-        public void accept(long t) {
-            b.accept(t);
-        }
-    }
-
-    /** {@link Sink} for implementing sort on SIZED double streams */
-    private static final class SizedDoubleSortingSink extends Sink.ChainedDouble {
-        private double[] array;
-        private int offset;
-
-        SizedDoubleSortingSink(Sink downstream) {
-            super(downstream);
-        }
-
-        @Override
-        public void begin(long size) {
-            if (size >= Streams.MAX_ARRAY_SIZE)
-                throw new IllegalArgumentException("Stream size exceeds max array size");
-            array = new double[(int) size];
-        }
-
-        @Override
-        public void end() {
-            Arrays.sort(array, 0, offset);
-            downstream.begin(offset);
-            for (int i = 0; i < offset; i++)
-                downstream.accept(array[i]);
-            downstream.end();
-            array = null;
-        }
-
-        @Override
-        public void accept(double t) {
-            array[offset++] = t;
-        }
-    }
-
-    /** {@link Sink} for implementing sort on double streams */
-    private static final class DoubleSortingSink extends Sink.ChainedDouble {
-        private SpinedBuffer.OfDouble b;
-
-        DoubleSortingSink(Sink sink) {
-            super(sink);
-        }
-
-        @Override
-        public void begin(long size) {
-            b = (size > 0) ? new SpinedBuffer.OfDouble((int) size) : new SpinedBuffer.OfDouble();
-        }
-
-        @Override
-        public void end() {
-            double[] doubles = b.asDoubleArray();
-            Arrays.sort(doubles);
-            downstream.begin(doubles.length);
-            for (double aDouble : doubles)
-                downstream.accept(aDouble);
-            downstream.end();
-        }
-
-        @Override
-        public void accept(double t) {
-            b.accept(t);
-        }
-    }
-}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/share/classes/java/util/stream/SortedOps.java	Wed Apr 03 21:51:55 2013 +0200
@@ -0,0 +1,521 @@
+/*
+ * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package java.util.stream;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Comparators;
+import java.util.Objects;
+import java.util.Spliterator;
+import java.util.concurrent.ForkJoinTask;
+import java.util.function.IntFunction;
+
+
+/**
+ * Factory methods for transforming streams into sorted streams.
+ *
+ * @since 1.8
+ */
+final class SortedOps {
+
+    private SortedOps() { }
+
+    /**
+     * Appends a "sorted" operation to the provided stream.
+     *
+     * @param <T> The type of both input and output elements
+     * @param upstream A reference stream with element type T
+     */
+    static<T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream) {
+        return new OfRef<>(upstream);
+    }
+
+    /**
+     * Appends a "sorted" operation to the provided stream.
+     *
+     * @param <T> The type of both input and output elements
+     * @param upstream A reference stream with element type T
+     * @param comparator the comparator to order elements by
+     */
+    static<T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream,
+                                Comparator<? super T> comparator) {
+        return new OfRef<>(upstream, comparator);
+    }
+
+    /**
+     * Appends a "sorted" operation to the provided stream.
+     *
+     * @param <T> The type of both input and output elements
+     * @param upstream A reference stream with element type T
+     */
+    static<T> IntStream makeInt(AbstractPipeline<?, Integer, ?> upstream) {
+        return new OfInt(upstream);
+    }
+
+    /**
+     * Appends a "sorted" operation to the provided stream.
+     *
+     * @param <T> The type of both input and output elements
+     * @param upstream A reference stream with element type T
+     */
+    static<T> LongStream makeLong(AbstractPipeline<?, Long, ?> upstream) {
+        return new OfLong(upstream);
+    }
+
+    /**
+     * Appends a "sorted" operation to the provided stream.
+     *
+     * @param <T> The type of both input and output elements
+     * @param upstream A reference stream with element type T
+     */
+    static<T> DoubleStream makeDouble(AbstractPipeline<?, Double, ?> upstream) {
+        return new OfDouble(upstream);
+    }
+
+    /** Specialized subtype for sorting reference streams */
+    private static final class OfRef<T> extends ReferencePipeline.StatefulOp<T, T> {
+        /** Comparator used for sorting */
+        private final boolean isNaturalSort;
+        private final Comparator<? super T> comparator;
+
+        /**
+         * Sort using natural order of {@literal <T>} which must be
+         * {@code Comparable}.
+         */
+        OfRef(AbstractPipeline<?, T, ?> upstream) {
+            super(upstream, StreamShape.REFERENCE,
+                  StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
+            this.isNaturalSort = true;
+            // Will throw CCE when we try to sort if T is not Comparable
+            this.comparator = (Comparator<? super T>) Comparators.naturalOrder();
+        }
+
+        /**
+         * Sort using the provided comparator.
+         *
+         * @param comparator The comparator to be used to evaluate ordering.
+         */
+        OfRef(AbstractPipeline<?, T, ?> upstream, Comparator<? super T> comparator) {
+            super(upstream, StreamShape.REFERENCE,
+                  StreamOpFlag.IS_ORDERED | StreamOpFlag.NOT_SORTED);
+            this.isNaturalSort = false;
+            this.comparator = Objects.requireNonNull(comparator);
+        }
+
+        @Override
+        public Sink<T> opWrapSink(int flags, Sink sink) {
+            Objects.requireNonNull(sink);
+
+            // If the input is already naturally sorted and this operation
+            // also naturally sorted then this is a no-op
+            if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort)
+                return sink;
+            else if (StreamOpFlag.SIZED.isKnown(flags))
+                return new SizedRefSortingSink<>(sink, comparator);
+            else
+                return new RefSortingSink<>(sink, comparator);
+        }
+
+        @Override
+        public <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,
+                                                 Spliterator<P_IN> spliterator,
+                                                 IntFunction<T[]> generator) {
+            // If the input is already naturally sorted and this operation
+            // naturally sorts then collect the output
+            if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags()) && isNaturalSort) {
+                return helper.evaluate(spliterator, false, generator);
+            }
+            else {
+                // @@@ Weak two-pass parallel implementation; parallel collect, parallel sort
+                T[] flattenedData = helper.evaluate(spliterator, true, generator).asArray(generator);
+                Arrays.parallelSort(flattenedData, comparator);
+                return Nodes.node(flattenedData);
+            }
+        }
+    }
+
+    /** Specialized subtype for sorting int streams */
+    private static final class OfInt extends IntPipeline.StatefulOp<Integer> {
+        OfInt(AbstractPipeline<?, Integer, ?> upstream) {
+            super(upstream, StreamShape.INT_VALUE,
+                  StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
+        }
+
+        @Override
+        public Sink<Integer> opWrapSink(int flags, Sink sink) {
+            Objects.requireNonNull(sink);
+
+            if (StreamOpFlag.SORTED.isKnown(flags))
+                return sink;
+            else if (StreamOpFlag.SIZED.isKnown(flags))
+                return new SizedIntSortingSink(sink);
+            else
+                return new IntSortingSink(sink);
+        }
+
+        @Override
+        public <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper,
+                                                       Spliterator<P_IN> spliterator,
+                                                       IntFunction<Integer[]> generator) {
+            if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) {
+                return helper.evaluate(spliterator, false, generator);
+            }
+            else {
+                Node.OfInt n = (Node.OfInt) helper.evaluate(spliterator, true, generator);
+
+                int[] content = n.asIntArray();
+                Arrays.parallelSort(content);
+
+                return Nodes.intNode(content);
+            }
+        }
+    }
+
+    /** Specialized subtype for sorting long streams */
+    private static final class OfLong extends LongPipeline.StatefulOp<Long> {
+        OfLong(AbstractPipeline<?, Long, ?> upstream) {
+            super(upstream, StreamShape.LONG_VALUE,
+                  StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
+        }
+
+        @Override
+        public Sink<Long> opWrapSink(int flags, Sink sink) {
+            Objects.requireNonNull(sink);
+
+            if (StreamOpFlag.SORTED.isKnown(flags))
+                return sink;
+            else if (StreamOpFlag.SIZED.isKnown(flags))
+                return new SizedLongSortingSink(sink);
+            else
+                return new LongSortingSink(sink);
+        }
+
+        @Override
+        public <P_IN> Node<Long> opEvaluateParallel(PipelineHelper<Long> helper,
+                                                    Spliterator<P_IN> spliterator,
+                                                    IntFunction<Long[]> generator) {
+            if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) {
+                return helper.evaluate(spliterator, false, generator);
+            }
+            else {
+                Node.OfLong n = (Node.OfLong) helper.evaluate(spliterator, true, generator);
+
+                long[] content = n.asLongArray();
+                Arrays.parallelSort(content);
+
+                return Nodes.longNode(content);
+            }
+        }
+    }
+
+    /** Specialized subtype for sorting double streams */
+    private static final class OfDouble extends DoublePipeline.StatefulOp<Double> {
+        OfDouble(AbstractPipeline<?, Double, ?> upstream) {
+            super(upstream, StreamShape.DOUBLE_VALUE,
+                  StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
+        }
+
+        @Override
+        public Sink<Double> opWrapSink(int flags, Sink sink) {
+            Objects.requireNonNull(sink);
+
+            if (StreamOpFlag.SORTED.isKnown(flags))
+                return sink;
+            else if (StreamOpFlag.SIZED.isKnown(flags))
+                return new SizedDoubleSortingSink(sink);
+            else
+                return new DoubleSortingSink(sink);
+        }
+
+        @Override
+        public <P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper,
+                                                      Spliterator<P_IN> spliterator,
+                                                      IntFunction<Double[]> generator) {
+            if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) {
+                return helper.evaluate(spliterator, false, generator);
+            }
+            else {
+                Node.OfDouble n = (Node.OfDouble) helper.evaluate(spliterator, true, generator);
+
+                double[] content = n.asDoubleArray();
+                Arrays.parallelSort(content);
+
+                return Nodes.doubleNode(content);
+            }
+        }
+    }
+
+    /** {@link ForkJoinTask} for implementing sort on SIZED reference streams */
+    private static final class SizedRefSortingSink<T> extends Sink.ChainedReference<T> {
+        private final Comparator<? super T> comparator;
+        private T[] array;
+        private int offset;
+
+        SizedRefSortingSink(Sink sink, Comparator<? super T> comparator) {
+            super(sink);
+            this.comparator = comparator;
+        }
+
+        @Override
+        public void begin(long size) {
+            if (size >= Streams.MAX_ARRAY_SIZE)
+                throw new IllegalArgumentException("Stream size exceeds max array size");
+            array = (T[]) new Object[(int) size];
+        }
+
+        @Override
+        public void end() {
+            // Need to use offset rather than array.length since the downstream
+            // many be short-circuiting
+            // @@@ A better approach is to know if the downstream short-circuits
+            //     and check sink.cancellationRequested
+            Arrays.sort(array, 0, offset, comparator);
+            downstream.begin(offset);
+            for (int i = 0; i < offset; i++)
+                downstream.accept(array[i]);
+            downstream.end();
+            array = null;
+        }
+
+        @Override
+        public void accept(T t) {
+            array[offset++] = t;
+        }
+    }
+
+    /** {@link Sink} for implementing sort on reference streams */
+    private static final class RefSortingSink<T> extends Sink.ChainedReference<T> {
+        private final Comparator<? super T> comparator;
+        private ArrayList<T> list;
+
+        RefSortingSink(Sink sink, Comparator<? super T> comparator) {
+            super(sink);
+            this.comparator = comparator;
+        }
+
+        @Override
+        public void begin(long size) {
+            list = (size >= 0) ? new ArrayList<T>((int) size) : new ArrayList<T>();
+        }
+
+        @Override
+        public void end() {
+            list.sort(comparator);
+            downstream.begin(list.size());
+            list.forEach(downstream::accept);
+            downstream.end();
+            list = null;
+        }
+
+        @Override
+        public void accept(T t) {
+            list.add(t);
+        }
+    }
+
+    /** {@link Sink} for implementing sort on SIZED int streams */
+    private static final class SizedIntSortingSink extends Sink.ChainedInt {
+        private int[] array;
+        private int offset;
+
+        SizedIntSortingSink(Sink downstream) {
+            super(downstream);
+        }
+
+        @Override
+        public void begin(long size) {
+            if (size >= Streams.MAX_ARRAY_SIZE)
+                throw new IllegalArgumentException("Stream size exceeds max array size");
+            array = new int[(int) size];
+        }
+
+        @Override
+        public void end() {
+            Arrays.sort(array, 0, offset);
+            downstream.begin(offset);
+            for (int i = 0; i < offset; i++)
+                downstream.accept(array[i]);
+            downstream.end();
+            array = null;
+        }
+
+        @Override
+        public void accept(int t) {
+            array[offset++] = t;
+        }
+    }
+
+    /** {@link Sink} for implementing sort on int streams */
+    private static final class IntSortingSink extends Sink.ChainedInt {
+        private SpinedBuffer.OfInt b;
+
+        IntSortingSink(Sink sink) {
+            super(sink);
+        }
+
+        @Override
+        public void begin(long size) {
+            b = (size > 0) ? new SpinedBuffer.OfInt((int) size) : new SpinedBuffer.OfInt();
+        }
+
+        @Override
+        public void end() {
+            int[] ints = b.asIntArray();
+            Arrays.sort(ints);
+            downstream.begin(ints.length);
+            for (int anInt : ints)
+                downstream.accept(anInt);
+            downstream.end();
+        }
+
+        @Override
+        public void accept(int t) {
+            b.accept(t);
+        }
+    }
+
+    /** {@link Sink} for implementing sort on SIZED long streams */
+    private static final class SizedLongSortingSink extends Sink.ChainedLong {
+        private long[] array;
+        private int offset;
+
+        SizedLongSortingSink(Sink downstream) {
+            super(downstream);
+        }
+
+        @Override
+        public void begin(long size) {
+            if (size >= Streams.MAX_ARRAY_SIZE)
+                throw new IllegalArgumentException("Stream size exceeds max array size");
+            array = new long[(int) size];
+        }
+
+        @Override
+        public void end() {
+            Arrays.sort(array, 0, offset);
+            downstream.begin(offset);
+            for (int i = 0; i < offset; i++)
+                downstream.accept(array[i]);
+            downstream.end();
+            array = null;
+        }
+
+        @Override
+        public void accept(long t) {
+            array[offset++] = t;
+        }
+    }
+
+    /** {@link Sink} for implementing sort on long streams */
+    private static final class LongSortingSink extends Sink.ChainedLong {
+        private SpinedBuffer.OfLong b;
+
+        LongSortingSink(Sink sink) {
+            super(sink);
+        }
+
+        @Override
+        public void begin(long size) {
+            b = (size > 0) ? new SpinedBuffer.OfLong((int) size) : new SpinedBuffer.OfLong();
+        }
+
+        @Override
+        public void end() {
+            long[] longs = b.asLongArray();
+            Arrays.sort(longs);
+            downstream.begin(longs.length);
+            for (long aLong : longs)
+                downstream.accept(aLong);
+            downstream.end();
+        }
+
+        @Override
+        public void accept(long t) {
+            b.accept(t);
+        }
+    }
+
+    /** {@link Sink} for implementing sort on SIZED double streams */
+    private static final class SizedDoubleSortingSink extends Sink.ChainedDouble {
+        private double[] array;
+        private int offset;
+
+        SizedDoubleSortingSink(Sink downstream) {
+            super(downstream);
+        }
+
+        @Override
+        public void begin(long size) {
+            if (size >= Streams.MAX_ARRAY_SIZE)
+                throw new IllegalArgumentException("Stream size exceeds max array size");
+            array = new double[(int) size];
+        }
+
+        @Override
+        public void end() {
+            Arrays.sort(array, 0, offset);
+            downstream.begin(offset);
+            for (int i = 0; i < offset; i++)
+                downstream.accept(array[i]);
+            downstream.end();
+            array = null;
+        }
+
+        @Override
+        public void accept(double t) {
+            array[offset++] = t;
+        }
+    }
+
+    /** {@link Sink} for implementing sort on double streams */
+    private static final class DoubleSortingSink extends Sink.ChainedDouble {
+        private SpinedBuffer.OfDouble b;
+
+        DoubleSortingSink(Sink sink) {
+            super(sink);
+        }
+
+        @Override
+        public void begin(long size) {
+            b = (size > 0) ? new SpinedBuffer.OfDouble((int) size) : new SpinedBuffer.OfDouble();
+        }
+
+        @Override
+        public void end() {
+            double[] doubles = b.asDoubleArray();
+            Arrays.sort(doubles);
+            downstream.begin(doubles.length);
+            for (double aDouble : doubles)
+                downstream.accept(aDouble);
+            downstream.end();
+        }
+
+        @Override
+        public void accept(double t) {
+            b.accept(t);
+        }
+    }
+}
--- a/src/share/classes/java/util/stream/StatefulOp.java	Wed Apr 03 11:29:23 2013 -0700
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,78 +0,0 @@
-/*
- * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.  Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-package java.util.stream;
-
-import java.util.Spliterator;
-import java.util.function.IntFunction;
-
-/**
- * A stateful intermediate stream operation ({@link IntermediateOp}).
- * <em>Stateful</em> means that state is accumulated as elements are processed.
- * Examples of stateful operations are sorting, extracting a subsequence of the
- * input, or removing duplicates.  Statefulness has an effect on how the
- * operation can be parallelized.  Stateless operations parallelize trivially
- * because they are homomorphisms under concatenation:
- *
- * <pre>
- *     statelessOp(a || b) = statelessOp(a) || statelessOp(b)
- * </pre>
- *
- * where {@code ||} denotes concatenation.  Stateful operations may still be
- * parallelizable, but are not amenable to the automatic parallelization of
- * stateless operations.  Accordingly, a stateful operation must provide its
- * own parallel execution implementation
- * ({@link IntermediateOp#evaluateParallel(PipelineHelper, java.util.Spliterator, java.util.function.IntFunction)})
- * as well as {@link IntermediateOp#wrapSink(int, Sink)}.
- *
- * @param <E> Type of input and output elements.
- *
- * @see IntermediateOp
- * @see TerminalOp
- * @since 1.8
- */
-interface StatefulOp<E> extends IntermediateOp<E, E> {
-
-    /**
-     * Returns {@code true}.  Any overriding implementations must also return
-     * {@code true}
-     * @implSpec The default implementation returns {@code true}
-     * @return {@code true}
-     */
-    @Override
-    default boolean isStateful() {
-        return true;
-    }
-
-    /**
-     * {@inheritDoc}
-     *
-     * @apiNote
-     * An implementation of this method must be provided, but it is acceptable
-     * if the implementation is sequential.  A generic sequential implementation
-     * is available as
-     * {@link PipelineHelper#evaluateSequential(IntermediateOp, java.util.Spliterator, java.util.function.IntFunction)}.
-     */
-    <P_IN> Node<E> evaluateParallel(PipelineHelper<P_IN, E> helper, Spliterator<P_IN> spliterator, IntFunction<E[]> generator);
-}
--- a/src/share/classes/java/util/stream/StreamSpliterators.java	Wed Apr 03 11:29:23 2013 -0700
+++ b/src/share/classes/java/util/stream/StreamSpliterators.java	Wed Apr 03 21:51:55 2013 +0200
@@ -51,7 +51,7 @@
         // True if this spliterator supports splitting
         final boolean isParallel;
 
-        final PipelineHelper<P_IN, P_OUT> ph;
+        final PipelineHelper<P_OUT> ph;
 
         Supplier<Spliterator<P_IN>> supplier;
 
@@ -82,14 +82,14 @@
         // When buffer is not null there still may be elements in the buffer to be consumed
         boolean finished;
 
-        AbstractWrappingSpliterator(PipelineHelper<P_IN, P_OUT> ph, Supplier<Spliterator<P_IN>> supplier, boolean parallel) {
+        AbstractWrappingSpliterator(PipelineHelper<P_OUT> ph, Supplier<Spliterator<P_IN>> supplier, boolean parallel) {
             this.ph = ph;
             this.supplier = supplier;
             this.spliterator = null;
             this.isParallel = parallel;
         }
 
-        AbstractWrappingSpliterator(PipelineHelper<P_IN, P_OUT> ph, Spliterator<P_IN> spliterator, boolean parallel) {
+        AbstractWrappingSpliterator(PipelineHelper<P_OUT> ph, Spliterator<P_IN> spliterator, boolean parallel) {
             this.ph = ph;
             this.supplier = null;
             this.spliterator = spliterator;
@@ -203,11 +203,11 @@
     static final class WrappingSpliterator<P_IN, P_OUT>
             extends AbstractWrappingSpliterator<P_IN, P_OUT, SpinedBuffer<P_OUT>> {
 
-        WrappingSpliterator(PipelineHelper<P_IN, P_OUT> ph, Supplier<Spliterator<P_IN>> supplier, boolean parallel) {
+        WrappingSpliterator(PipelineHelper<P_OUT> ph, Supplier<Spliterator<P_IN>> supplier, boolean parallel) {
             super(ph, supplier, parallel);
         }
 
-        WrappingSpliterator(PipelineHelper<P_IN, P_OUT> ph, Spliterator<P_IN> spliterator, boolean parallel) {
+        WrappingSpliterator(PipelineHelper<P_OUT> ph, Spliterator<P_IN> spliterator, boolean parallel) {
             super(ph, spliterator, parallel);
         }
 
@@ -250,11 +250,11 @@
             extends AbstractWrappingSpliterator<P_IN, Integer, SpinedBuffer.OfInt>
             implements Spliterator.OfInt {
 
-        IntWrappingSpliterator(PipelineHelper<P_IN, Integer> ph, Supplier<Spliterator<P_IN>> supplier, boolean parallel) {
+        IntWrappingSpliterator(PipelineHelper<Integer> ph, Supplier<Spliterator<P_IN>> supplier, boolean parallel) {
             super(ph, supplier, parallel);
         }
 
-        IntWrappingSpliterator(PipelineHelper<P_IN, Integer> ph, Spliterator<P_IN> spliterator, boolean parallel) {
+        IntWrappingSpliterator(PipelineHelper<Integer> ph, Spliterator<P_IN> spliterator, boolean parallel) {
             super(ph, spliterator, parallel);
         }
 
@@ -302,11 +302,11 @@
             extends AbstractWrappingSpliterator<P_IN, Long, SpinedBuffer.OfLong>
             implements Spliterator.OfLong {
 
-        LongWrappingSpliterator(PipelineHelper<P_IN, Long> ph, Supplier<Spliterator<P_IN>> supplier, boolean parallel) {
+        LongWrappingSpliterator(PipelineHelper<Long> ph, Supplier<Spliterator<P_IN>> supplier, boolean parallel) {
             super(ph, supplier, parallel);
         }
 
-        LongWrappingSpliterator(PipelineHelper<P_IN, Long> ph, Spliterator<P_IN> spliterator, boolean parallel) {
+        LongWrappingSpliterator(PipelineHelper<Long> ph, Spliterator<P_IN> spliterator, boolean parallel) {
             super(ph, spliterator, parallel);
         }
 
@@ -354,11 +354,11 @@
             extends AbstractWrappingSpliterator<P_IN, Double, SpinedBuffer.OfDouble>
             implements Spliterator.OfDouble {
 
-        DoubleWrappingSpliterator(PipelineHelper<P_IN, Double> ph, Supplier<Spliterator<P_IN>> supplier, boolean parallel) {
+        DoubleWrappingSpliterator(PipelineHelper<Double> ph, Supplier<Spliterator<P_IN>> supplier, boolean parallel) {
             super(ph, supplier, parallel);
         }
 
-        DoubleWrappingSpliterator(PipelineHelper<P_IN, Double> ph, Spliterator<P_IN> spliterator, boolean parallel) {
+        DoubleWrappingSpliterator(PipelineHelper<Double> ph, Spliterator<P_IN> spliterator, boolean parallel) {
             super(ph, spliterator, parallel);
         }
 
--- a/src/share/classes/java/util/stream/Streams.java	Wed Apr 03 11:29:23 2013 -0700
+++ b/src/share/classes/java/util/stream/Streams.java	Wed Apr 03 21:51:55 2013 +0200
@@ -108,10 +108,11 @@
      * @param <T> Type of elements
      * @return A new sequential {@code Stream}
      */
-    public static<T> Stream<T> stream(Supplier<? extends Spliterator<T>> supplier, int characteristics) {
+    public static<T> Stream<T> stream(Supplier<? extends Spliterator<T>> supplier,
+                                      int characteristics) {
         Objects.requireNonNull(supplier);
-        return new ReferencePipeline<>(supplier,
-                                       StreamOpFlag.fromCharacteristics(characteristics) & ~StreamOpFlag.IS_PARALLEL);
+        return new ReferencePipeline.Head<>(supplier,
+                                            StreamOpFlag.fromCharacteristics(characteristics) & ~StreamOpFlag.IS_PARALLEL);
     }
 
     /**
@@ -140,10 +141,11 @@
      * @param <T> Type of elements
      * @return A new parallel {@code Stream}
      */
-    public static<T> Stream<T> parallelStream(Supplier<? extends Spliterator<T>> supplier, int characteristics) {
+    public static<T> Stream<T> parallelStream(Supplier<? extends Spliterator<T>> supplier,
+                                              int characteristics) {
         Objects.requireNonNull(supplier);
-        return new ReferencePipeline<>(supplier,
-                                       StreamOpFlag.fromCharacteristics(characteristics) | StreamOpFlag.IS_PARALLEL);
+        return new ReferencePipeline.Head<>(supplier,
+                                            StreamOpFlag.fromCharacteristics(characteristics) | StreamOpFlag.IS_PARALLEL);
     }
 
     /**
@@ -170,8 +172,8 @@
      */
     public static<T> Stream<T> stream(Spliterator<T> spliterator) {
         Objects.requireNonNull(spliterator);
-        return new ReferencePipeline<>(spliterator,
-                                       StreamOpFlag.fromCharacteristics(spliterator) & ~StreamOpFlag.IS_PARALLEL);
+        return new ReferencePipeline.Head<>(spliterator,
+                                            StreamOpFlag.fromCharacteristics(spliterator) & ~StreamOpFlag.IS_PARALLEL);
     }
 
     /**
@@ -198,8 +200,8 @@
      */
     public static<T> Stream<T> parallelStream(Spliterator<T> spliterator) {
         Objects.requireNonNull(spliterator);
-        return new ReferencePipeline<>(spliterator,
-                                       StreamOpFlag.fromCharacteristics(spliterator) | StreamOpFlag.IS_PARALLEL);
+        return new ReferencePipeline.Head<>(spliterator,
+                                            StreamOpFlag.fromCharacteristics(spliterator) | StreamOpFlag.IS_PARALLEL);
     }
 
     // IntStream construction
@@ -238,9 +240,10 @@
      *        {@code source.get().getCharacteristics()}
      * @return A new sequential {@code IntStream}
      */
-    public static IntStream intStream(Supplier<? extends Spliterator.OfInt> supplier, int characteristics) {
-        return new IntPipeline<>(supplier,
-                                 StreamOpFlag.fromCharacteristics(characteristics) & ~StreamOpFlag.IS_PARALLEL);
+    public static IntStream intStream(Supplier<? extends Spliterator.OfInt> supplier,
+                                      int characteristics) {
+        return new IntPipeline.Head<>(supplier,
+                                      StreamOpFlag.fromCharacteristics(characteristics) & ~StreamOpFlag.IS_PARALLEL);
     }
 
     /**
@@ -268,9 +271,10 @@
      *        {@code source.get().getCharacteristics()}
      * @return A new parallel {@code IntStream}
      */
-    public static IntStream intParallelStream(Supplier<? extends Spliterator.OfInt> supplier, int characteristics) {
-        return new IntPipeline<>(supplier,
-                                 StreamOpFlag.fromCharacteristics(characteristics) | StreamOpFlag.IS_PARALLEL);
+    public static IntStream intParallelStream(Supplier<? extends Spliterator.OfInt> supplier,
+                                              int characteristics) {
+        return new IntPipeline.Head<>(supplier,
+                                      StreamOpFlag.fromCharacteristics(characteristics) | StreamOpFlag.IS_PARALLEL);
     }
 
     /**
@@ -296,7 +300,7 @@
      * @return A new sequential {@code IntStream}
      */
     public static IntStream intStream(Spliterator.OfInt spliterator) {
-        return new IntPipeline<>(spliterator, spliterator.characteristics() & ~StreamOpFlag.IS_PARALLEL);
+        return new IntPipeline.Head<>(spliterator, spliterator.characteristics() & ~StreamOpFlag.IS_PARALLEL);
     }
 
     /**
@@ -322,8 +326,8 @@
      * @return A new parallel {@code IntStream}
      */
     public static IntStream intParallelStream(Spliterator.OfInt spliterator) {
-        return new IntPipeline<>(spliterator,
-                                 StreamOpFlag.fromCharacteristics(spliterator.characteristics()) | StreamOpFlag.IS_PARALLEL);
+        return new IntPipeline.Head<>(spliterator,
+                                      StreamOpFlag.fromCharacteristics(spliterator.characteristics()) | StreamOpFlag.IS_PARALLEL);
     }
 
     // LongStream construction
@@ -362,9 +366,10 @@
      *        {@code source.get().getCharacteristics()}
      * @return A new sequential {@code LongStream}
      */
-    public static LongStream longStream(Supplier<? extends Spliterator.OfLong> supplier, int characteristics) {
-        return new LongPipeline<>(supplier,
-                                  StreamOpFlag.fromCharacteristics(characteristics) & ~StreamOpFlag.IS_PARALLEL);
+    public static LongStream longStream(Supplier<? extends Spliterator.OfLong> supplier,
+                                        int characteristics) {
+        return new LongPipeline.Head<>(supplier,
+                                       StreamOpFlag.fromCharacteristics(characteristics) & ~StreamOpFlag.IS_PARALLEL);
     }
 
     /**
@@ -392,9 +397,10 @@
      *        {@code source.get().getCharacteristics()}
      * @return A new parallel {@code LongStream}
      */
-    public static LongStream longParallelStream(Supplier<? extends Spliterator.OfLong> supplier, int characteristics) {
-        return new LongPipeline<>(supplier,
-                                  StreamOpFlag.fromCharacteristics(characteristics) | StreamOpFlag.IS_PARALLEL);
+    public static LongStream longParallelStream(Supplier<? extends Spliterator.OfLong> supplier,
+                                                int characteristics) {
+        return new LongPipeline.Head<>(supplier,
+                                       StreamOpFlag.fromCharacteristics(characteristics) | StreamOpFlag.IS_PARALLEL);
     }
 
     /**
@@ -420,8 +426,8 @@
      * @return A new sequential {@code LongStream}
      */
     public static LongStream longStream(Spliterator.OfLong spliterator) {
-        return new LongPipeline<>(spliterator,
-                                  StreamOpFlag.fromCharacteristics(spliterator.characteristics()) & ~StreamOpFlag.IS_PARALLEL);
+        return new LongPipeline.Head<>(spliterator,
+                                       StreamOpFlag.fromCharacteristics(spliterator.characteristics()) & ~StreamOpFlag.IS_PARALLEL);
     }
 
     /**
@@ -447,8 +453,8 @@
      * @return A new parallel {@code LongStream}
      */
     public static LongStream longParallelStream(Spliterator.OfLong spliterator) {
-        return new LongPipeline<>(spliterator,
-                                  StreamOpFlag.fromCharacteristics(spliterator.characteristics()) | StreamOpFlag.IS_PARALLEL);
+        return new LongPipeline.Head<>(spliterator,
+                                       StreamOpFlag.fromCharacteristics(spliterator.characteristics()) | StreamOpFlag.IS_PARALLEL);
     }
 
     // DoubleStream construction
@@ -489,8 +495,8 @@
      */
     public static DoubleStream doubleStream(Supplier<? extends Spliterator.OfDouble> supplier,
                                             int characteristics) {
-        return new DoublePipeline<>(supplier,
-                                    StreamOpFlag.fromCharacteristics(characteristics) & ~StreamOpFlag.IS_PARALLEL);
+        return new DoublePipeline.Head<>(supplier,
+                                         StreamOpFlag.fromCharacteristics(characteristics) & ~StreamOpFlag.IS_PARALLEL);
     }
 
     /**
@@ -520,8 +526,8 @@
      */
     public static DoubleStream doubleParallelStream(Supplier<? extends Spliterator.OfDouble> supplier,
                                                     int characteristics) {
-        return new DoublePipeline<>(supplier,
-                                    StreamOpFlag.fromCharacteristics(characteristics) | StreamOpFlag.IS_PARALLEL);
+        return new DoublePipeline.Head<>(supplier,
+                                         StreamOpFlag.fromCharacteristics(characteristics) | StreamOpFlag.IS_PARALLEL);
     }
 
     /**
@@ -547,8 +553,8 @@
      * @return A new sequential {@code DoubleStream}
      */
     public static DoubleStream doubleStream(Spliterator.OfDouble spliterator) {
-        return new DoublePipeline<>(spliterator,
-                                    StreamOpFlag.fromCharacteristics(spliterator.characteristics()) & ~StreamOpFlag.IS_PARALLEL);
+        return new DoublePipeline.Head<>(spliterator,
+                                         StreamOpFlag.fromCharacteristics(spliterator.characteristics()) & ~StreamOpFlag.IS_PARALLEL);
     }
 
     /**
@@ -574,8 +580,8 @@
      * @return A new parallel {@code DoubleStream}
      */
     public static DoubleStream doubleParallelStream(Spliterator.OfDouble spliterator) {
-        return new DoublePipeline<>(spliterator,
-                                    StreamOpFlag.fromCharacteristics(spliterator.characteristics()) | StreamOpFlag.IS_PARALLEL);
+        return new DoublePipeline.Head<>(spliterator,
+                                         StreamOpFlag.fromCharacteristics(spliterator.characteristics()) | StreamOpFlag.IS_PARALLEL);
     }
 
     // Infinite Stream generators
--- a/src/share/classes/java/util/stream/TerminalOp.java	Wed Apr 03 11:29:23 2013 -0700
+++ b/src/share/classes/java/util/stream/TerminalOp.java	Wed Apr 03 21:51:55 2013 +0200
@@ -40,8 +40,6 @@
  *
  * @param <E_IN> The type of input elements
  * @param <R>    The type of the result
- * @see StatefulOp
- * @see IntermediateOp
  * @since 1.8
  */
 interface TerminalOp<E_IN, R> {
@@ -77,7 +75,7 @@
      * @param spliterator the source spliterator
      * @return the result of the evaluation
      */
-    default <P_IN> R evaluateParallel(PipelineHelper<P_IN, E_IN> helper,
+    default <P_IN> R evaluateParallel(PipelineHelper<E_IN> helper,
                                       Spliterator<P_IN> spliterator) {
         if (Tripwire.ENABLED)
             Tripwire.trip(getClass(), "{0} triggering TerminalOp.evaluateParallel serial default");
@@ -93,6 +91,6 @@
      * @param spliterator the source spliterator
      * @return the result of the evaluation
      */
-    <P_IN> R evaluateSequential(PipelineHelper<P_IN, E_IN> helper,
+    <P_IN> R evaluateSequential(PipelineHelper<E_IN> helper,
                                 Spliterator<P_IN> spliterator);
 }
--- a/test-ng/bootlib/java/util/stream/CollectorOps.java	Wed Apr 03 11:29:23 2013 -0700
+++ b/test-ng/bootlib/java/util/stream/CollectorOps.java	Wed Apr 03 21:51:55 2013 +0200
@@ -60,19 +60,19 @@
         }
 
         @Override
-        public int getOpFlags() {
+        public int opGetFlags() {
             return opFlags;
         }
 
         @Override
-        public Sink<E_IN> wrapSink(int flags, Sink<E_IN> sink) {
+        public Sink<E_IN> opWrapSink(int flags, Sink<E_IN> sink) {
             return sink;
         }
 
         @Override
-        public <P_IN> Node<E_IN> evaluateParallel(PipelineHelper<P_IN, E_IN> helper,
-                                                  Spliterator<P_IN> spliterator,
-                                                  IntFunction<E_IN[]> generator) {
+        public <P_IN> Node<E_IN> opEvaluateParallel(PipelineHelper<E_IN> helper,
+                                                    Spliterator<P_IN> spliterator,
+                                                    IntFunction<E_IN[]> generator) {
             return helper.evaluate(spliterator, false, generator);
         }
     }
@@ -87,13 +87,13 @@
         }
 
         @Override
-        public <P_IN> Node<T> evaluateParallel(PipelineHelper<P_IN, T> helper,
-                                               Spliterator<P_IN> spliterator,
-                                               IntFunction<T[]> generator) {
+        public <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,
+                                                 Spliterator<P_IN> spliterator,
+                                                 IntFunction<T[]> generator) {
             int flags = helper.getStreamAndOpFlags();
 
             Assert.assertTrue(StreamOpFlag.SIZED.isKnown(flags));
-            return super.evaluateParallel(helper, spliterator, generator);
+            return super.opEvaluateParallel(helper, spliterator, generator);
         }
 
         public static class OfInt extends TestParallelSizedOp<Integer> {
--- a/test-ng/bootlib/java/util/stream/FlagDeclaringOp.java	Wed Apr 03 11:29:23 2013 -0700
+++ b/test-ng/bootlib/java/util/stream/FlagDeclaringOp.java	Wed Apr 03 21:51:55 2013 +0200
@@ -53,12 +53,12 @@
     }
 
     @Override
-    public int getOpFlags() {
+    public int opGetFlags() {
         return flags;
     }
 
     @Override
-    public Sink<T> wrapSink(int flags, Sink sink) {
+    public Sink<T> opWrapSink(int flags, Sink sink) {
         return sink;
     }
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test-ng/bootlib/java/util/stream/IntermediateOp.java	Wed Apr 03 21:51:55 2013 +0200
@@ -0,0 +1,235 @@
+/*
+ * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package java.util.stream;
+
+import java.util.Spliterator;
+import java.util.function.IntFunction;
+
+/**
+ * An operation in a stream pipeline that takes a stream as input and produces
+ * a stream, possibly of a different type, as output.  An intermediate operation
+ * has an input type and an output type, reflected in its type parameters
+ * {@code E_IN} and {@code E_OUT}, and, an associated input shape and
+ * output shape.  An intermediate operation also has a set of <em>operation
+ * flags</em> that describes how it transforms characteristics of the stream
+ * (such as sortedness or size; see {@link StreamOpFlag}).
+ *
+ * <p>Intermediate operations are implemented in terms of <em>sink transforms
+ * </em>; given a {@code Sink} for the output type of the operation, produce a
+ * {@code Sink} for the input type of the operation, which, when fed with
+ * values, has the effect of implementing the desired operation on the input
+ * values and feeding them to the output sink.
+ *
+ * <p>Some intermediate operations are <em>stateful</em>.  This means that the
+ * sinks they produce as a result of the above wrapping may maintain state from
+ * processing earlier elements.  Stateful intermediate operations must implement
+ * the {@link StatefulOp} interface.  Statefulness has an effect on how the
+ * operation can be parallelized.  Stateless operations parallelize trivially
+ * because they are homomorphisms under concatenation:
+ *
+ * <pre>
+ *     statelessOp(a || b) = statelessOp(a) || statelessOp(b)
+ * </pre>
+ *
+ * where {@code ||} denotes concatenation.  Stateful operations may still be
+ * parallelizable, but are not amenable to the automatic parallelization of
+ * stateless operations.  Accordingly, a stateful operation must provide its own
+ * parallel execution implementation
+ * ({@link IntermediateOp#opEvaluateParallel(PipelineHelper, java.util.Spliterator, java.util.function.IntFunction)}).
+ *
+ * @apiNote
+ * As an example, consider the stream pipeline:
+ * <pre>
+ *     int oldestBob = people.stream()
+ *                            .filter(p -> p.getFirstName.equals("Bob"))
+ *                            .mapToInt(p -> p.getAge())
+ *                            .max();
+ * </pre>
+ *
+ * <p>This pipeline has two intermediate operations, filter and map.  The
+ * filtering operation has input and output types of {@code Person} (with input
+ * and output shape of {@code REFERENCE}), and the mapping operation has an
+ * input type of {@code Person} and an output type of {@code Integer} (with
+ * shape {@code INT_VALUE}.)  When we construct a sink chain, the mapping
+ * operation will be asked to transform a {@code Sink.OfInt} which computes the
+ * maximum value into a {@code Sink} which accepts {@code Person} objects, and
+ * whose behavior is to take the supplied {@code Person}, call {@code getAge()}
+ * on it, and pass the resulting value to the downstream sink.  This sink
+ * transform might be implement as:
+ *
+ * <pre>
+ *     new Sink.ChainedReference<U>(sink) {
+ *         public void accept(U u) {
+ *             downstream.accept(mappingFunction.applyAsInt(u));
+ *         }
+ *     }
+ * </pre>
+ *
+ * @param <E_IN>  Type of input elements to the operation
+ * @param <E_OUT> Type of output elements to the operation
+ * @see TerminalOp
+ * @see StatefulOp
+ * @since 1.8
+ */
+interface IntermediateOp<E_IN, E_OUT> {
+
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    public static<T> AbstractPipeline chain(AbstractPipeline upstream,
+                                            IntermediateOp<?, T> op) {
+        if (op instanceof StatefulOp)
+            return StatefulOp.chain(upstream, (StatefulOp) op);
+        switch (op.outputShape()) {
+            case REFERENCE:
+                return new ReferencePipeline.StatelessOp<Object, T>(upstream, op.inputShape(), op.opGetFlags()) {
+                    public Sink opWrapSink(int flags, Sink<T> sink) {
+                        return op.opWrapSink(flags, sink);
+                    }
+                };
+            case INT_VALUE:
+                return new IntPipeline.StatelessOp<Object>(upstream, op.inputShape(), op.opGetFlags()) {
+                    public Sink opWrapSink(int flags, Sink sink) {
+                        return op.opWrapSink(flags, sink);
+                    }
+                };
+            case LONG_VALUE:
+                return new LongPipeline.StatelessOp<Object>(upstream, op.inputShape(), op.opGetFlags()) {
+                    @Override
+                    Sink opWrapSink(int flags, Sink sink) {
+                        return op.opWrapSink(flags, sink);
+                    }
+                };
+            case DOUBLE_VALUE:
+                return new DoublePipeline.StatelessOp<Object>(upstream, op.inputShape(), op.opGetFlags()) {
+                    @Override
+                    Sink opWrapSink(int flags, Sink sink) {
+                        return op.opWrapSink(flags, sink);
+                    }
+                };
+            default: throw new IllegalStateException(op.outputShape().toString());
+        }
+    }
+
+
+    /**
+     * Gets the shape of the input type of this operation
+     *
+     * @implSpec The default returns {@code StreamShape.REFERENCE}
+     * @return Shape of the input type of this operation
+     */
+    default StreamShape inputShape() { return StreamShape.REFERENCE; }
+
+    /**
+     * Gets the shape of the output type of this operation
+     *
+     * @implSpec The default returns {@code StreamShape.REFERENCE}
+     * @return Shape of the output type of this operation
+     */
+    default StreamShape outputShape() { return StreamShape.REFERENCE; }
+
+    /**
+     * Gets the operation flags of this operation.
+     *
+     * @implSpec The default returns {@code 0}
+     * @return a bitmap describing the operation flags of this operation
+     * @see StreamOpFlag
+     */
+    default int opGetFlags() { return 0; }
+
+    /**
+     * Returns whether this operation is stateful or not.  If it is stateful,
+     * then the method
+     * {@link #opEvaluateParallel(PipelineHelper, java.util.Spliterator, java.util.function.IntFunction)}
+     * must be overridden.
+     *
+     * @implSpec The default implementation returns {@code false}.
+     * @return {@code true} if this operation is stateful
+     */
+    default boolean opIsStateful() { return false; }
+
+    /**
+     * Accepts a {@code Sink} which will receive the results of this operation,
+     * and return a {@code Sink} which accepts elements of the input type of
+     * this operation and which performs the operation, passing the results to
+     * the provided {@code Sink}.
+     *
+     * <p>The implementation may use the {@code flags} parameter to optimize the
+     * sink wrapping.  For example, if the input is already {@code DISTINCT},
+     * the implementation for the {@code Stream#distinct()} method could just
+     * return the sink it was passed.
+     *
+     * @param flags The combined stream and operation flags up to, but not
+     *        including, this operation.
+     * @param sink elements will be sent to this sink after the processing.
+     * @return a sink which will accept elements and perform the operation upon
+     *         each element, passing the results (if any) to the provided
+     *         {@code Sink}.
+     */
+    Sink<E_IN> opWrapSink(int flags, Sink<E_OUT> sink);
+
+    /**
+     * Performs a parallel evaluation of the operation using the specified
+     * {@code PipelineHelper} which describes the stream source and upstream
+     * intermediate operations.  Only called on stateful operations.  If
+     * {@link #opIsStateful()} returns true then implementations must override the
+     * default implementation.
+     *
+     * @implSpec The default implementation throws an
+     * {@link UnsupportedOperationException}
+     *
+     * @param helper the pipeline helper
+     * @param spliterator the source {@code Spliterator}
+     * @param generator the array generator
+     * @return a {@code Node} describing the result of the evaluation
+     */
+    default <P_IN> Node<E_OUT> opEvaluateParallel(PipelineHelper<E_OUT> helper,
+                                                  Spliterator<P_IN> spliterator,
+                                                  IntFunction<E_OUT[]> generator) {
+        throw new UnsupportedOperationException("Parallel evaluation is not supported");
+    }
+
+    /**
+     * Returns a {@code Spliterator} describing a parallel evaluation of the operation using
+     * the specified {@code PipelineHelper} which describes the stream source and upstream
+     * intermediate operations.  Only called on stateful operations.  It is not necessary
+     * (though acceptable) to do a full computation of the result here; it is preferable, if
+     * possible, to describe the result via a lazily evaluated spliterator.
+     *
+     * @implSpec The default implementation behaves as if:
+     * <pre>{@code
+     *     return evaluateParallel(helper, i -> (E_OUT[]) new Object[i]).spliterator();
+     * }</pre>
+     * and is suitable for implementations that cannot do better than a full synchronous
+     * evaluation.
+     *
+     * @param helper the pipeline helper
+     * @param spliterator the source {@code Spliterator}
+     * @return a {@code Spliterator} describing the result of the evaluation
+     */
+    @SuppressWarnings("unchecked")
+    default <P_IN> Spliterator<E_OUT> opEvaluateParallelLazy(PipelineHelper<E_OUT> helper,
+                                                             Spliterator<P_IN> spliterator) {
+        return opEvaluateParallel(helper, spliterator, i -> (E_OUT[]) new Object[i]).spliterator();
+    }
+}
--- a/test-ng/bootlib/java/util/stream/OpTestCase.java	Wed Apr 03 11:29:23 2013 -0700
+++ b/test-ng/bootlib/java/util/stream/OpTestCase.java	Wed Apr 03 21:51:55 2013 +0200
@@ -490,10 +490,10 @@
 
         AbstractPipeline createPipeline(StreamShape shape, Spliterator s, int flags) {
             switch (shape) {
-                case REFERENCE:    return new ReferencePipeline(s, flags);
-                case INT_VALUE:    return new IntPipeline(s, flags);
-                case LONG_VALUE:   return new LongPipeline(s, flags);
-                case DOUBLE_VALUE: return new DoublePipeline(s, flags);
+                case REFERENCE:    return new ReferencePipeline.Head<>(s, flags);
+                case INT_VALUE:    return new IntPipeline.Head(s, flags);
+                case LONG_VALUE:   return new LongPipeline.Head(s, flags);
+                case DOUBLE_VALUE: return new DoublePipeline.Head(s, flags);
                 default: throw new IllegalStateException("Unknown shape: " + shape);
             }
         }
@@ -522,7 +522,7 @@
 
     @SuppressWarnings({"rawtypes", "unchecked"})
     private static <T> AbstractPipeline<?, T, ?> chain(AbstractPipeline upstream, IntermediateOp<?, T> op) {
-        return (AbstractPipeline<?, T, ?>) upstream.pipeline(op);
+        return (AbstractPipeline<?, T, ?>) IntermediateOp.chain(upstream, op);
     }
 
     @SuppressWarnings({"rawtypes", "unchecked"})
@@ -577,12 +577,12 @@
         }
 
         @Override
-        public Sink<T> wrapSink(int flags, Sink<T> sink) {
+        public Sink<T> opWrapSink(int flags, Sink<T> sink) {
             return sink;
         }
 
         @Override
-        public int getOpFlags() {
+        public int opGetFlags() {
             return StreamOpFlag.IS_SHORT_CIRCUIT;
         }
 
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test-ng/bootlib/java/util/stream/StatefulOp.java	Wed Apr 03 21:51:55 2013 +0200
@@ -0,0 +1,125 @@
+/*
+ * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package java.util.stream;
+
+import java.util.Spliterator;
+import java.util.function.IntFunction;
+
+/**
+ * A stateful intermediate stream operation ({@link IntermediateOp}).
+ * <em>Stateful</em> means that state is accumulated as elements are processed.
+ * Examples of stateful operations are sorting, extracting a subsequence of the
+ * input, or removing duplicates.  Statefulness has an effect on how the
+ * operation can be parallelized.  Stateless operations parallelize trivially
+ * because they are homomorphisms under concatenation:
+ *
+ * <pre>
+ *     statelessOp(a || b) = statelessOp(a) || statelessOp(b)
+ * </pre>
+ *
+ * where {@code ||} denotes concatenation.  Stateful operations may still be
+ * parallelizable, but are not amenable to the automatic parallelization of
+ * stateless operations.  Accordingly, a stateful operation must provide its
+ * own parallel execution implementation
+ * ({@link IntermediateOp#opEvaluateParallel(PipelineHelper, java.util.Spliterator, java.util.function.IntFunction)})
+ * as well as {@link IntermediateOp#opWrapSink(int, Sink)}.
+ *
+ * @param <E> Type of input and output elements.
+ *
+ * @see IntermediateOp
+ * @see TerminalOp
+ * @since 1.8
+ */
+interface StatefulOp<E> extends IntermediateOp<E, E> {
+
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    public static<T> AbstractPipeline chain(AbstractPipeline upstream,
+                                            StatefulOp op) {
+        switch (op.outputShape()) {
+            case REFERENCE:
+                return new ReferencePipeline.StatefulOp<Object, T>(upstream, op.inputShape(), op.opGetFlags()) {
+                    @Override
+                    Sink opWrapSink(int flags, Sink sink) {
+                        return op.opWrapSink(flags, sink);
+                    }
+
+                    @Override
+                    <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper, Spliterator<P_IN> spliterator, IntFunction<T[]> generator) {
+                        return op.opEvaluateParallel(helper, spliterator, generator);
+                    }
+                };
+            case INT_VALUE:
+                return new IntPipeline.StatefulOp<Object>(upstream, op.inputShape(), op.opGetFlags()) {
+                    @Override
+                    Sink opWrapSink(int flags, Sink sink) {
+                        return op.opWrapSink(flags, sink);
+                    }
+
+                    @Override
+                    <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper, Spliterator<P_IN> spliterator, IntFunction<Integer[]> generator) {
+                        return (Node<Integer>) op.opEvaluateParallel((PipelineHelper<T>) helper, spliterator, null);
+                    }
+                };
+            case LONG_VALUE:
+                return new LongPipeline.StatefulOp<Object>(upstream, op.inputShape(), op.opGetFlags()) {
+                    @Override
+                    Sink opWrapSink(int flags, Sink sink) {
+                        return op.opWrapSink(flags, sink);
+                    }
+
+                    @Override
+                    <P_IN> Node<Long> opEvaluateParallel(PipelineHelper<Long> helper, Spliterator<P_IN> spliterator, IntFunction<Long[]> generator) {
+                        return (Node<Long>) op.opEvaluateParallel((PipelineHelper<T>) helper, spliterator, null);
+                    }
+                };
+            case DOUBLE_VALUE:
+                return new DoublePipeline.StatefulOp<Object>(upstream, op.inputShape(), op.opGetFlags()) {
+                    @Override
+                    Sink opWrapSink(int flags, Sink sink) {
+                        return op.opWrapSink(flags, sink);
+                    }
+
+                    @Override
+                    <P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper, Spliterator<P_IN> spliterator, IntFunction<Double[]> generator) {
+                        return (Node<Double>) op.opEvaluateParallel((PipelineHelper<T>) helper, spliterator, null);
+                    }
+                };
+            default: throw new IllegalStateException(op.outputShape().toString());
+        }
+    }
+
+    /**
+     * Returns {@code true}.  Any overriding implementations must also return
+     * {@code true}
+     * @implSpec The default implementation returns {@code true}
+     * @return {@code true}
+     */
+    @Override
+    default boolean opIsStateful() {
+        return true;
+    }
+
+    <P_IN> Node<E> opEvaluateParallel(PipelineHelper<E> helper, Spliterator<P_IN> spliterator, IntFunction<E[]> generator);
+}
--- a/test-ng/bootlib/java/util/stream/TestFlagExpectedOp.java	Wed Apr 03 11:29:23 2013 -0700
+++ b/test-ng/bootlib/java/util/stream/TestFlagExpectedOp.java	Wed Apr 03 21:51:55 2013 +0200
@@ -100,7 +100,7 @@
 
     @Override
     @SuppressWarnings({"rawtypes", "unchecked"})
-    public Sink<T> wrapSink(int flags, Sink upstream) {
+    public Sink<T> opWrapSink(int flags, Sink upstream) {
         assertFlags(flags);
         return upstream;
     }
--- a/test-ng/boottests/java/util/stream/FlagOpTest.java	Wed Apr 03 11:29:23 2013 -0700
+++ b/test-ng/boottests/java/util/stream/FlagOpTest.java	Wed Apr 03 21:51:55 2013 +0200
@@ -70,7 +70,7 @@
 
         @Override
         @SuppressWarnings({"unchecked", "rawtypes"})
-        public Sink<T> wrapSink(int flags, Sink sink) {
+        public Sink<T> opWrapSink(int flags, Sink sink) {
             this.wrapFlags = flags;
 
             if (downstream != null) {
@@ -271,7 +271,7 @@
 
         @Override
         @SuppressWarnings({"unchecked", "rawtypes"})
-        public Sink<T> wrapSink(int flags, Sink upstream) {
+        public Sink<T> opWrapSink(int flags, Sink upstream) {
             assertFlags(flags);
             return upstream;
         }
--- a/test-ng/boottests/java/util/stream/UnorderedTest.java	Wed Apr 03 11:29:23 2013 -0700
+++ b/test-ng/boottests/java/util/stream/UnorderedTest.java	Wed Apr 03 21:51:55 2013 +0200
@@ -219,7 +219,7 @@
         }
 
         @Override
-        public Sink<T> wrapSink(int flags, Sink<T> sink) {
+        public Sink<T> opWrapSink(int flags, Sink<T> sink) {
             if (StreamOpFlag.PARALLEL.isKnown(flags)) {
                 assertTrue(StreamOpFlag.ORDERED.isCleared(flags));
             }
@@ -235,7 +235,7 @@
         }
 
         @Override
-        public Sink<T> wrapSink(int flags, Sink<T> sink) {
+        public Sink<T> opWrapSink(int flags, Sink<T> sink) {
             assertTrue(StreamOpFlag.ORDERED.isKnown(flags) || StreamOpFlag.ORDERED.isPreserved(flags));
 
             return sink;
--- a/test-ng/tests/org/openjdk/tests/java/util/stream/RangeTest.java	Wed Apr 03 11:29:23 2013 -0700
+++ b/test-ng/tests/org/openjdk/tests/java/util/stream/RangeTest.java	Wed Apr 03 21:51:55 2013 +0200
@@ -33,11 +33,14 @@
 import java.util.stream.StreamTestData;
 import java.util.stream.Streams;
 
+import org.testng.annotations.Test;
+
 /**
  * Primitive range tests
  *
  * @author Brian Goetz
  */
+@Test
 public class RangeTest extends OpTestCase {
 
     public void testInfiniteRangeFindFirst() {
@@ -255,7 +258,7 @@
                 "long range", () -> Streams.longRange(start, end, step));
     }
 
-    public void tesLongRangeReduce() {
+    public void testLongRangeReduce() {
         withData(longRangeData(0, 10000, 1)).
                 terminal(s -> s.reduce(0, Long::sum)).exercise();
     }
--- a/test-ng/tests/org/openjdk/tests/java/util/stream/TabulatorsTest.java	Wed Apr 03 11:29:23 2013 -0700
+++ b/test-ng/tests/org/openjdk/tests/java/util/stream/TabulatorsTest.java	Wed Apr 03 21:51:55 2013 +0200
@@ -64,6 +64,7 @@
  *
  * @author Brian Goetz
  */
+@SuppressWarnings({"rawtypes", "unchecked"})
 public class TabulatorsTest extends OpTestCase {
     // There are 8 versions of groupingBy:
     //   groupingBy: { map supplier, not } x { downstream collector, not } x { concurrent, not }