changeset 7776:9335e621253a

Refactor AbstractPipeline - remove the dependence on Supplier<Spliterator> - remove all state from PipelineHelper (in preparation for it to be merged into AbstractPipeline) - par/seq methods now conform to the current spec Contributed-by: Brian Goetz <brian.goetz@Oracle.COM>, Paul Sandoz <Paul.Sandoz@oracle.com>
author psandoz
date Fri, 29 Mar 2013 21:57:00 +0100
parents 5887f3e72b2c
children ed04983cbf86
files src/share/classes/java/util/stream/AbstractPipeline.java src/share/classes/java/util/stream/AbstractTask.java src/share/classes/java/util/stream/DistinctOp.java src/share/classes/java/util/stream/DoublePipeline.java src/share/classes/java/util/stream/ForEachOps.java src/share/classes/java/util/stream/IntPipeline.java src/share/classes/java/util/stream/IntermediateOp.java src/share/classes/java/util/stream/LongPipeline.java src/share/classes/java/util/stream/NodeUtils.java src/share/classes/java/util/stream/PipelineHelper.java src/share/classes/java/util/stream/ReferencePipeline.java src/share/classes/java/util/stream/SliceOp.java src/share/classes/java/util/stream/SortedOp.java src/share/classes/java/util/stream/StatefulOp.java src/share/classes/java/util/stream/StreamOpFlag.java src/share/classes/java/util/stream/StreamSpliterators.java src/share/classes/java/util/stream/Streams.java test-ng/bootlib/java/util/stream/CollectorOps.java test-ng/bootlib/java/util/stream/LambdaTestHelpers.java test-ng/bootlib/java/util/stream/OpTestCase.java test-ng/boottests/java/util/stream/UnorderedTest.java test-ng/build.xml test-ng/tests/org/openjdk/tests/java/util/stream/StreamParSeqTest.java
diffstat 23 files changed, 583 insertions(+), 563 deletions(-) [+]
line wrap: on
line diff
--- a/src/share/classes/java/util/stream/AbstractPipeline.java	Thu Mar 28 18:32:00 2013 -0700
+++ b/src/share/classes/java/util/stream/AbstractPipeline.java	Fri Mar 29 21:57:00 2013 +0100
@@ -26,8 +26,7 @@
 
 import java.util.Objects;
 import java.util.Spliterator;
-import java.util.function.IntFunction;
-import java.util.function.Supplier;
+import java.util.function.*;
 
 /**
  * Abstract base class for "pipeline" classes, which are the core implementations of the Stream interface and its
@@ -59,63 +58,68 @@
  * @since 1.8
  */
 abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>> /* implements BaseStream */ {
+    /** Backlink to the head of the pipeline chain (to self if this is the head) */
+    protected final AbstractPipeline head;
+
     /** The "upstream" pipeline, or null if this pipeline object represents the stream source */
-    protected final AbstractPipeline upstream;
+    protected final AbstractPipeline previous;
 
-    /** The number of intermediate operations between this pipeline object and the stream source */
-    protected final int depth;
+    /** The next stage in the pipeline, or null if this is the last stage
+     * Effectively final at the point of linking to the next pipeline.
+     */
+    protected AbstractPipeline next;
 
     /** The intermediate operation represented by this pipeline object, or null if this pipeline object
      * represents the stream source */
     protected final IntermediateOp op;
 
+    /** The stream flags if this pipeline is the head, or the op flags if this pipeline is an intermediate op stage
+     * Effectively final for all but the head, where the
+     * {@link StreamOpFlag#IS_PARALLEL} bit is set or cleared.
+     */
+    protected int rawFlags;
+
+    /** The number of intermediate operations between this pipeline object
+     * and the stream source if sequential, or the previous stateful if parallel.
+     * Valid at the point of pipeline preparation for evaluation.
+     */
+    protected int depth;
+
     /** The combined source and operation flags for the source and all operations up to and including the
-     * operation represented by this pipeline object
+     * operation represented by this pipeline object.
+     * Valid at the point of pipeline preparation for evaluation.
      */
-    protected final int combinedSourceAndOpFlags;
+    protected int combinedFlags;
 
     /**
-     * The source spliterator for this pipeline object, which is common to all pipeline objects in the same
-     * stream pipeline.
+     * The source spliterator. Only valid for the head pipeline.
+     * Before the pipeline is consumed if non-null then {@code sourceSupplier} is be null.
+     * After the pipeline is consumed if non-null then is set to null.
      */
-    private final Supplier<? extends Spliterator<?>> source;
+    private Spliterator<?> sourceSpliterator;
+    /**
+     * The source supplier. Only valid for the head pipeline.
+     * Before the pipeline is consumed if non-null then {@code sourceSpliterator} is be null.
+     * After the pipeline is consumed if non-null then is set to null.
+     */
+    private Supplier<? extends Spliterator<?>> sourceSupplier;
 
-    private static enum PipelineState {
-        UNLINKED() {
-            @Override
-            PipelineState transitionTo(PipelineState newState) {
-                assert newState != UNLINKED;
-                return newState;
-            }
-        },
-        LINKED() {
-            @Override
-            PipelineState transitionTo(PipelineState newState) {
-                throw new IllegalStateException("Stream is already linked to a child stream");
-            }
-        },
-        CONSUMED() {
-            @Override
-            PipelineState transitionTo(PipelineState newState) {
-                throw new IllegalStateException("Stream source is already consumed");
-            }
-        };
+    /** True if this pipeline has been consumed */
+    boolean consumed;
 
-        abstract PipelineState transitionTo(PipelineState newState);
-    }
-
-    private PipelineState state = PipelineState.UNLINKED;
-
-    private AbstractPipeline(AbstractPipeline upstream,
-                             int depth,
-                             IntermediateOp op,
-                             int combinedSourceAndOpFlags,
-                             Supplier<? extends Spliterator<?>> source) {
-        this.upstream = upstream;
-        this.depth = depth;
-        this.op = op;
-        this.combinedSourceAndOpFlags = combinedSourceAndOpFlags;
-        this.source = source;
+    /**
+     * Constructor for the head of a stream pipeline.
+     *
+     * @param source {@code Supplier<Spliterator>} describing the stream source
+     * @param sourceFlags The source flags for the stream source, described in {@link StreamOpFlag}
+     */
+    protected AbstractPipeline(Supplier<? extends Spliterator<?>> source,
+                               int sourceFlags) {
+        this.previous = null;
+        this.op = null;
+        this.sourceSupplier = source;
+        this.head = this;
+        this.rawFlags = sourceFlags;
     }
 
     /**
@@ -124,38 +128,37 @@
      * @param source {@code Spliterator} describing the stream source
      * @param sourceFlags The source flags for the stream source, described in {@link StreamOpFlag}
      */
-    protected AbstractPipeline(Supplier<? extends Spliterator<?>> source,
+    protected AbstractPipeline(Spliterator<?> source,
                                int sourceFlags) {
-        this(null, 0, null,
-             StreamOpFlag.combineOpFlags(sourceFlags, StreamOpFlag.INITIAL_OPS_VALUE),
-             Objects.requireNonNull(source));
+        this.previous = null;
+        this.op = null;
+        this.sourceSpliterator = source;
+        this.head = this;
+        this.rawFlags = sourceFlags;
     }
 
     /**
      * Constructor for appending an intermediate operation onto an existing pipeline.
      *
-     * @param upstream the upstream element source.
+     * @param previous the upstream element source.
      * @param op the operation performed upon elements.
      */
-    protected AbstractPipeline(AbstractPipeline<?, E_IN, ?> upstream,
+    protected AbstractPipeline(AbstractPipeline<?, E_IN, ?> previous,
                                IntermediateOp<E_IN, E_OUT> op) {
-        this(upstream, upstream.depth + 1,
-             op, StreamOpFlag.combineOpFlags(op.getOpFlags() & StreamOpFlag.OP_MASK,
-                                             upstream.combinedSourceAndOpFlags),
-             upstream.source);
+        assert getOutputShape() == op.outputShape();
+        assert previous.getOutputShape() == op.inputShape();
+        if (previous.consumed)
+            throw new IllegalStateException("stream has already been operated upon");
+        previous.consumed = true;
+        previous.next = this;
 
-        assert !op.isStateful() || !isParallel();
-        assert getOutputShape() == op.outputShape();
-        assert upstream.getOutputShape() == op.inputShape();
-
-        upstream.transitionTo(PipelineState.LINKED);
+        this.previous = previous;
+        this.op = op;
+        this.rawFlags = op.getOpFlags() & StreamOpFlag.OP_MASK;
+        this.head = previous.head;
     }
 
-    private void transitionTo(PipelineState newState) {
-        state = state.transitionTo(newState);
-    }
-
-    private static <T> IntFunction<T[]> objectArrayGenerator() {
+    static <T> IntFunction<T[]> objectArrayGenerator() {
         return size -> (T[]) new Object[size];
     }
 
@@ -171,31 +174,7 @@
      */
     @SuppressWarnings("unchecked")
     public <E_NEXT, S_NEXT extends BaseStream<E_NEXT, S_NEXT>> S_NEXT pipeline(IntermediateOp<E_OUT, E_NEXT> newOp) {
-        assert getOutputShape() == newOp.inputShape();
-        if (newOp.isStateful() && isParallel()) {
-            transitionTo(PipelineState.LINKED);
-
-            // @@@ the newFlags and the node.spliterator().characteristics() will be out of sync
-            //     If a stream.spliterator() is obtained then what guarantees should be made about
-            //     the characteristics?
-            int newFlags = StreamOpFlag.toStreamFlags(
-                    StreamOpFlag.combineOpFlags(newOp.getOpFlags() & StreamOpFlag.OP_MASK, combinedSourceAndOpFlags));
-            return (S_NEXT) stream(
-                    new NodeSpliteratorSupplier<E_NEXT>() {
-                        Node<E_NEXT> n = null;
-
-                        @Override
-                        public Node<E_NEXT> getNode(PipelineHelperImpl downstream) {
-                            if (n == null) {
-                                n = newOp.evaluateParallel(new PipelineHelperImpl(downstream));
-                            }
-                            return n;
-                        }
-                    },
-                    newFlags | StreamOpFlag.IS_SIZED);
-        }
-        else
-            return (S_NEXT) chain(this, newOp);
+        return (S_NEXT) chain(this, newOp);
     }
 
     /** Specialized version of pipeline for stateless reference-bearing intermediate ops */
@@ -230,15 +209,64 @@
      * @return the result.
      */
     public <R> R pipeline(TerminalOp<E_OUT, R> terminalOp) {
-        return pipeline(terminalOp, AbstractPipeline.objectArrayGenerator());
+        assert getOutputShape() == terminalOp.inputShape();
+        if (consumed)
+            throw new IllegalStateException("stream has already been operated upon");
+        consumed = true;
+
+        prepare(terminalOp.getOpFlags());
+        PipelineHelperImpl<E_IN> helper = new PipelineHelperImpl<>();
+        return isParallel()
+               ? terminalOp.evaluateParallel(helper)
+               : terminalOp.evaluateSequential(helper);
     }
 
-    private <R> R pipeline(TerminalOp<E_OUT, R> terminalOp, IntFunction<E_OUT[]> generator) {
-        assert getOutputShape() == terminalOp.inputShape();
-        transitionTo(PipelineState.CONSUMED);
+    /**
+     * Prepare the pipeline for evaluation.
+     * @param terminalFlags
+     */
+    private void prepare(int terminalFlags) {
+        head.combinedFlags = StreamOpFlag.combineOpFlags(head.rawFlags, StreamOpFlag.INITIAL_OPS_VALUE);
+        int depth = 1;
+        if (isParallel()) {
+            AbstractPipeline backPropagationHead = head;
+            for (AbstractPipeline u = head, p = head.next;
+                 p != null;
+                 u = p, p = p.next) {
+                p.depth = depth++;
+                int thisOpFlags = p.rawFlags;
 
-        PipelineHelperImpl<E_IN> helper = new PipelineHelperImpl<>(terminalOp, generator);
-        return isParallel() ? terminalOp.evaluateParallel(helper) : terminalOp.evaluateSequential(helper);
+                if (p.op.isStateful()) {
+                    // If the stateful operation is a short-circuit operation
+                    // then move the back propagation head forwards
+                    // NOTE: there are no size-injecting ops
+                    if (StreamOpFlag.SHORT_CIRCUIT.isKnown(thisOpFlags)) {
+                        backPropagationHead = p;
+                    }
+
+                    p.depth = 0;
+                    depth = 1;
+                    // The following injects size, it is equivalent to:
+                    // StreamOpFlag.combineOpFlags(StreamOpFlag.IS_SIZED, p.combinedFlags);
+                    thisOpFlags = (thisOpFlags & ~StreamOpFlag.NOT_SIZED) | StreamOpFlag.IS_SIZED;
+                }
+                p.combinedFlags = StreamOpFlag.combineOpFlags(thisOpFlags, u.combinedFlags);
+            }
+
+            // Apply the upstream terminal flags
+            int upstreamTerminalFlags = terminalFlags & StreamOpFlag.UPSTREAM_TERMINAL_OP_MASK;
+            for (AbstractPipeline p = backPropagationHead; p != null; p = p.next) {
+                p.combinedFlags = StreamOpFlag.combineOpFlags(upstreamTerminalFlags, p.combinedFlags);
+            }
+        }
+        else {
+            for (AbstractPipeline u = head, p = head.next; p != null; u = p, p = p.next) {
+                p.combinedFlags = StreamOpFlag.combineOpFlags(p.rawFlags, u.combinedFlags);
+                p.depth = depth++;
+            }
+        }
+        // Update last stage to incorporate terminal flags
+        combinedFlags = StreamOpFlag.combineOpFlags(terminalFlags, combinedFlags);
     }
 
     /**
@@ -264,24 +292,27 @@
      * @return a node that holds the collected output elements.
      */
     public Node<E_OUT> collectOutput(boolean flatten, IntFunction<E_OUT[]> generator) {
-        return pipeline(
-                new TerminalOp<E_OUT, Node<E_OUT>>() {
-                    @Override
-                    public StreamShape inputShape() {
-                        return getOutputShape();
-                    }
+        if (consumed)
+            throw new IllegalStateException("stream has already been operated upon");
+        consumed = true;
 
-                    @Override
-                    public <P_IN> Node<E_OUT> evaluateParallel(PipelineHelper<P_IN, E_OUT> helper) {
-                        return helper.collectOutput(flatten);
-                    }
+        prepare(0);
+        if (isParallel()) {
+            // If the last intermediate operation is stateful then
+            // evaluate directly to avoid an extra collection step
+            if (op != null && op.isStateful()) {
+                PipelineHelperImpl helper = previous.new PipelineHelperImpl();
+                @SuppressWarnings("rawtypes")
+                Node<E_OUT> node = op.evaluateParallel(helper, generator);
+                if (flatten) {
+                    node = flatten(node, generator);
+                }
+                return node;
+            }
+        }
 
-                    @Override
-                    public <P_IN> Node<E_OUT> evaluateSequential(PipelineHelper<P_IN, E_OUT> helper) {
-                        return helper.collectOutput(flatten);
-                    }
-                },
-                generator);
+        PipelineHelperImpl<E_IN> helper = new PipelineHelperImpl<>();
+        return helper.collectOutput(flatten, generator);
     }
 
     /** Common implementation of {@code limit()} / {@code substream} for all shapes */
@@ -292,28 +323,16 @@
 
     // Implements sequential() from BaseStream
     public S sequential() {
-        return !isParallel() ? (S) this : pipeline(new FlagSettingOp<E_OUT>(StreamOpFlag.NOT_PARALLEL));
+        if (StreamOpFlag.PARALLEL.isKnown(head.rawFlags))
+            head.rawFlags &= ~StreamOpFlag.IS_PARALLEL;
+        return (S) this;
     }
 
     // Implements parallel() from BaseStream
     public S parallel() {
-        return isParallel() ? (S) this : pipeline(new FlagSettingOp<E_OUT>(StreamOpFlag.IS_PARALLEL));
-    }
-
-    private Supplier<Spliterator<E_OUT>> spliteratorSupplier() {
-        // @@@ Should we query the spliterator to see if it is splittable, and if not, get iterator and wrap?
-        if (depth == 0) {
-            return (Supplier<Spliterator<E_OUT>>) source;
-        }
-        else {
-            return new UpstreamSpliteratorSupplier<E_OUT>() {
-                @Override
-                Spliterator<E_OUT> get(PipelineHelperImpl downstream) {
-                    PipelineHelperImpl<E_IN> ph = new PipelineHelperImpl<>(downstream);
-                    return AbstractPipeline.this.wrap(ph, ph.sourceSpliterator(), ph.isParallel());
-                }
-            };
-        }
+        if (!StreamOpFlag.PARALLEL.isKnown(head.rawFlags))
+            head.rawFlags |= StreamOpFlag.IS_PARALLEL;
+        return (S) this;
     }
 
     /**
@@ -330,28 +349,31 @@
      * @param helper the parallel pipeline helper from which elements are obtained.
      * @param flattenTree true of the returned node should be flattened to one node holding an
      *                    array of elements.
-     * @param <P_IN> the type of elements input into the pipeline.
+     * @param generator
      * @return the node holding elements output from the pipeline.
      */
-    protected abstract<P_IN> Node<E_OUT> collect(PipelineHelper<P_IN, E_OUT> helper, boolean flattenTree);
+    protected abstract<P_IN> Node<E_OUT> collect(PipelineHelper<P_IN, E_OUT> helper,
+                                                 boolean flattenTree, IntFunction<E_OUT[]> generator);
 
     /**
      * Flatten a node.
      *
      * @param node the node to flatten.
+     * @param generator
      * @return the flattened node.
      */
-    protected abstract <P_IN> Node<E_OUT> flatten(PipelineHelper<P_IN, E_OUT> helper, Node<E_OUT> node);
+    protected abstract Node<E_OUT> flatten(Node<E_OUT> node,
+                                           IntFunction<E_OUT[]> generator);
 
     /**
      * Create a spliterator that wraps a source spliterator, compatible with this stream shape,
      * and operations associated with a {@link PipelineHelper}.
      *
      * @param ph the pipeline helper.
-     * @param spliterator
      * @return the wrapping spliterator compatible with this shape.
      */
-    protected abstract<P_IN> Spliterator<E_OUT> wrap(PipelineHelper<P_IN, E_OUT> ph, Spliterator<P_IN> spliterator, boolean isParallel);
+    protected abstract<P_IN> Spliterator<E_OUT> wrap(PipelineHelper<P_IN, E_OUT> ph,
+                                                     boolean isParallel);
 
     /**
      * Create a lazy spliterator that wraps and obtains the supplied the spliterator
@@ -361,16 +383,6 @@
     protected abstract Spliterator<E_OUT> lazySpliterator(Supplier<? extends Spliterator<E_OUT>> supplier);
 
     /**
-     * Create a pipeline from a spliterator compatible with this stream shape.
-     *
-     * @param supplier a Supplier for the compatible spliterator
-     * @param flags the stream flags.
-     * @return the pipeline, whose content is sourced from the spliterator.
-     * @throws IllegalArgumentException if the spliterator is not compatible with this stream shape.
-     */
-    protected abstract <T> AbstractPipeline<?, T, ?> stream(Supplier<? extends Spliterator<?>> supplier, int flags);
-
-    /**
      * Traverse elements of a spliterator, compatible with this stream shape, pushing those elements into a sink.
      * <p>If the sink is cancelled no further elements will be pulled or pushed and this method will return.</p>
      *
@@ -417,9 +429,30 @@
 
     // from BaseStream
     public Spliterator<E_OUT> spliterator() {
-        transitionTo(PipelineState.CONSUMED);
+        if (consumed)
+            throw new IllegalStateException("stream has already been operated upon");
+        consumed = true;
 
-        return lazySpliterator(spliteratorSupplier());
+        prepare(0);
+        if (this == head) {
+            if (head.sourceSpliterator != null) {
+                Spliterator<E_OUT> s = head.sourceSpliterator;
+                head.sourceSpliterator = null;
+                return s;
+            }
+            else if (head.sourceSupplier != null) {
+                Supplier<Spliterator<E_OUT>> s = head.sourceSupplier;
+                head.sourceSupplier = null;
+                return lazySpliterator(s);
+            }
+            else {
+                throw new IllegalStateException("source already consumed");
+            }
+        }
+        else {
+            PipelineHelperImpl<E_IN> helper = new PipelineHelperImpl<>();
+            return wrap(helper, isParallel());
+        }
     }
 
     /**
@@ -431,12 +464,15 @@
      * @see StreamOpFlag
      */
     protected int getStreamFlags() {
-        return StreamOpFlag.toStreamFlags(combinedSourceAndOpFlags);
+        // @@@ Currently only used by tests, review and see if functionality
+        //     can be replaced by spliterator().characteristics()
+        prepare(0);
+        return StreamOpFlag.toStreamFlags(combinedFlags);
     }
 
     // from BaseStream
     public boolean isParallel() {
-        return StreamOpFlag.PARALLEL.isKnown(combinedSourceAndOpFlags);
+        return StreamOpFlag.PARALLEL.isKnown(head.rawFlags);
     }
 
     interface SinkWrapper<T> {
@@ -473,84 +509,7 @@
         }
     }
 
-    private abstract class UpstreamSpliteratorSupplier<T> implements Supplier<Spliterator<T>> {
-
-        UpstreamSpliteratorSupplier() {} // Avoid creation of special accessor
-
-        @Override
-        public Spliterator<T> get() {
-            // This may occur when a spliterator is obtained for a pipeline of zero depth
-            return get(null);
-        }
-
-        abstract Spliterator<T> get(PipelineHelperImpl downstream);
-    }
-
-    private abstract class NodeSpliteratorSupplier<T> extends UpstreamSpliteratorSupplier<T> {
-
-        NodeSpliteratorSupplier() {} // Avoid creation of special accessor
-
-        @Override
-        public Spliterator<T> get(PipelineHelperImpl downstream) {
-            return getNode(downstream).spliterator();
-        }
-
-        abstract Node<T> getNode(PipelineHelperImpl downstream);
-    }
-
     private final class PipelineHelperImpl<P_IN> implements PipelineHelper<P_IN, E_OUT> {
-        private final int terminalFlags;
-        private final int streamAndOpFlags;
-        private final IntFunction<E_OUT[]> generator;
-
-        // Source spliterator
-        private Spliterator<P_IN> spliterator;
-
-        PipelineHelperImpl(int terminalFlags, IntFunction<E_OUT[]> generator) {
-            // If a sequential pipeline then unset the clearing of ORDERED
-            // @@@ Should a general mask be used e.g. TERMINAL_OP_SEQUENTIAL_MASK
-            this.terminalFlags = isParallel() ? terminalFlags : terminalFlags & ~StreamOpFlag.ORDERED.clear();
-            this.streamAndOpFlags = StreamOpFlag.combineOpFlags(this.terminalFlags, combinedSourceAndOpFlags);
-            this.generator = generator;
-        }
-
-        PipelineHelperImpl(PipelineHelperImpl downstream) {
-            // If downstream == null if there is no downstream pipeline and this helper encapsulates
-            // the terminal pipeline for a spliterator() terminal operation
-
-            // If the depth is zero then propagate back the generator
-            // Since stateful ops have the same input and output type it is guaranteed
-            // that the array generator will generate instances of the correct array type
-            // (assuming no user-error).
-            this(downstream != null ? downstream.terminalFlags() & StreamOpFlag.UPSTREAM_TERMINAL_OP_MASK : 0,
-                 downstream != null && downstream.depth() == 0 ? downstream.arrayGenerator() : AbstractPipeline.objectArrayGenerator());
-        }
-
-        PipelineHelperImpl(TerminalOp terminalOp, IntFunction<E_OUT[]> generator) {
-            this(terminalOp.getOpFlags() & StreamOpFlag.TERMINAL_OP_MASK,
-                 generator);
-        }
-
-        boolean isTraversing() {
-            return spliterator != null;
-        }
-
-        Node<E_OUT> getSourceNodeIfAvailable() {
-            if (depth == 0 && source instanceof NodeSpliteratorSupplier) {
-                return ((NodeSpliteratorSupplier<E_OUT>) source).getNode(this);
-            }
-            else {
-                return null;
-            }
-        }
-
-        int depth() {
-            return depth;
-        }
-
-        int terminalFlags() {
-            return terminalFlags;
-        }
 
         @Override
         public long exactOutputSizeIfKnown(Spliterator<P_IN> spliterator) {
@@ -581,7 +540,7 @@
         public void intoWrappedWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
             AbstractPipeline p = AbstractPipeline.this;
             while (p.depth > 0) {
-                p = p.upstream;
+                p = p.previous;
             }
             wrappedSink.begin(spliterator.getExactSizeIfKnown());
             p.forEachWithCancel(spliterator, wrappedSink);
@@ -589,52 +548,51 @@
         }
 
         @Override
-        public StreamShape getInputShape() {
-            AbstractPipeline p = AbstractPipeline.this;
-            while (p.depth > 0) {
-                p = p.upstream;
-            }
-            return p.getOutputShape();
-        }
-
-        @Override
-        public StreamShape getOutputShape() {
-            return AbstractPipeline.this.getOutputShape();
-        }
-
-        @Override
         public int getStreamAndOpFlags() {
-            return streamAndOpFlags;
-        }
-
-        @Override
-        public int getTerminalOpFlags() {
-            return terminalFlags;
+            return combinedFlags;
         }
 
         @Override
         public Spliterator<P_IN> sourceSpliterator() {
-            if (spliterator == null) {
-                spliterator = (source instanceof UpstreamSpliteratorSupplier)
-                              ? ((UpstreamSpliteratorSupplier<P_IN>) source).get(this)
-                              : (Spliterator<P_IN>) source.get();
+            if (isParallel()) {
+                // Find the last stateful op
+                AbstractPipeline p = AbstractPipeline.this;
+                while (p.op != null && !p.op.isStateful()) {
+                    p = p.previous;
+                }
+                if (p.op != null) {
+                    PipelineHelperImpl helper = p.previous.new PipelineHelperImpl();
+                    return p.op.evaluateParallel(helper, objectArrayGenerator()).spliterator();
+                }
             }
-            return spliterator;
+
+            if (head.sourceSpliterator != null) {
+                Spliterator<P_IN> result = (Spliterator<P_IN>) head.sourceSpliterator;
+                head.sourceSpliterator = null;
+                return result;
+            }
+            else if (head.sourceSupplier != null) {
+                Spliterator<P_IN> result = (Spliterator<P_IN>) head.sourceSupplier.get();
+                head.sourceSupplier = null;
+                return result;
+            }
+            else {
+                throw new IllegalStateException("source already consumed");
+            }
         }
 
         @Override
         public Sink<P_IN> wrapSink(Sink sink) {
             Objects.requireNonNull(sink);
 
-            int upstreamTerminalFlags = terminalFlags & StreamOpFlag.UPSTREAM_TERMINAL_OP_MASK;
-            for (AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.upstream) {
-                sink = p.op.wrapSink(StreamOpFlag.combineOpFlags(upstreamTerminalFlags, p.upstream.combinedSourceAndOpFlags), sink);
+            for (AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previous) {
+                sink = p.op.wrapSink(p.previous.combinedFlags, sink);
             }
             return sink;
         }
 
         @Override
-        public Node.Builder<E_OUT> makeNodeBuilder(long exactSizeIfKnown) {
+        public Node.Builder<E_OUT> makeNodeBuilder(long exactSizeIfKnown, IntFunction<E_OUT[]> generator) {
             return AbstractPipeline.this.makeNodeBuilder(exactSizeIfKnown, generator);
         }
 
@@ -645,73 +603,32 @@
 
         @Override
         @SuppressWarnings("unchecked")
-        public Node<E_OUT> collectOutput(boolean flatten) {
-            if (isTraversing())
-                throw new IllegalStateException();
-
-            Node<E_OUT> n = getSourceNodeIfAvailable();
-            if (n != null) {
-                // @@@ Do we always need to flatten, or only if flatten=true?
-                return AbstractPipeline.this.flatten(this, n);
-            } else {
-                if (isParallel()) {
-                    return AbstractPipeline.this.collect(this, flatten);
-                }
-                else {
-                    Node.Builder<E_OUT> nb = makeNodeBuilder(
-                            exactOutputSizeIfKnown(sourceSpliterator()));
-                    return into(nb, sourceSpliterator()).build();
-                }
+        public Node<E_OUT> collectOutput(boolean flatten, IntFunction<E_OUT[]> generator) {
+            if (isParallel()) {
+                return collect(this, flatten, generator);
+            }
+            else {
+                Spliterator<P_IN> sourceSpliterator = sourceSpliterator();
+                Node.Builder<E_OUT> nb = makeNodeBuilder(
+                        exactOutputSizeIfKnown(sourceSpliterator), generator);
+                return into(nb, sourceSpliterator).build();
             }
         }
 
         @Override
-        @SuppressWarnings("unchecked")
-        public IntFunction<E_OUT[]> arrayGenerator() {
-            return generator;
-        }
-
-        @Override
-        public Node<E_OUT> evaluateSequential(IntermediateOp<E_OUT, E_OUT> op) {
+        public Node<E_OUT> evaluateSequential(IntermediateOp<E_OUT, E_OUT> op,
+                                              Spliterator<P_IN> sourceSpliterator, IntFunction<E_OUT[]> generator) {
             long sizeIfKnown = StreamOpFlag.SIZED.isPreserved(op.getOpFlags())
-                               ? exactOutputSizeIfKnown(sourceSpliterator())
+                               ? exactOutputSizeIfKnown(sourceSpliterator)
                                : -1;
-            final Node.Builder<E_OUT> nb = makeNodeBuilder(sizeIfKnown);
+            final Node.Builder<E_OUT> nb = makeNodeBuilder(sizeIfKnown, generator);
             Sink<E_OUT> opSink = op.wrapSink(getStreamAndOpFlags(), nb);
 
             if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(op.getOpFlags()))
-                into(opSink, sourceSpliterator());
+                into(opSink, sourceSpliterator);
             else
-                intoWrappedWithCancel(wrapSink(opSink), sourceSpliterator());
+                intoWrappedWithCancel(wrapSink(opSink), sourceSpliterator);
             return nb.build();
         }
     }
-
-    private class FlagSettingOp<E_OUT> implements IntermediateOp<E_OUT, E_OUT> {
-        private final int flags;
-
-        private FlagSettingOp(int flags) {
-            this.flags = flags;
-        }
-
-        @Override
-        public StreamShape outputShape() {
-            return getOutputShape();
-        }
-
-        @Override
-        public StreamShape inputShape() {
-            return getOutputShape();
-        }
-
-        @Override
-        public int getOpFlags() {
-            return flags;
-        }
-
-        @Override
-        public Sink<E_OUT> wrapSink(int flags, Sink<E_OUT> sink) {
-            return sink;
-        }
-    }
 }
--- a/src/share/classes/java/util/stream/AbstractTask.java	Thu Mar 28 18:32:00 2013 -0700
+++ b/src/share/classes/java/util/stream/AbstractTask.java	Fri Mar 29 21:57:00 2013 +0100
@@ -120,6 +120,19 @@
     }
 
     /**
+     * Alternate constructor for root nodes that have already gotten the
+     * Spliterator from the helper.
+     */
+    protected AbstractTask(PipelineHelper<P_IN, P_OUT> helper,
+                           Spliterator<P_IN> spliterator,
+                           long targetSize) {
+        super(null);
+        this.helper = helper;
+        this.spliterator = spliterator;
+        this.targetSize = targetSize;
+    }
+
+    /**
      * Constructor for non-root nodes
      *
      * @param parent This node's parent task
--- a/src/share/classes/java/util/stream/DistinctOp.java	Thu Mar 28 18:32:00 2013 -0700
+++ b/src/share/classes/java/util/stream/DistinctOp.java	Fri Mar 29 21:57:00 2013 +0100
@@ -30,6 +30,7 @@
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.IntFunction;
 
 /**
  * A {@link StatefulOp} that eliminates duplicates from a stream.
@@ -109,10 +110,10 @@
     }
 
     @Override
-    public <S> Node<T> evaluateParallel(PipelineHelper<S, T> helper) {
+    public <S> Node<T> evaluateParallel(PipelineHelper<S, T> helper, IntFunction<T[]> generator) {
         if (StreamOpFlag.DISTINCT.isKnown(helper.getStreamAndOpFlags())) {
             // No-op
-            return helper.collectOutput(false);
+            return helper.collectOutput(false, generator);
         }
         else if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
             // If the stream is SORTED then it should also be ORDERED so the following will also
--- a/src/share/classes/java/util/stream/DoublePipeline.java	Thu Mar 28 18:32:00 2013 -0700
+++ b/src/share/classes/java/util/stream/DoublePipeline.java	Fri Mar 29 21:57:00 2013 +0100
@@ -54,10 +54,20 @@
     /**
      * Constructor for the head of a stream pipeline.
      *
+     * @param source {@code Supplier<Spliterator>} describing the stream source
+     * @param sourceFlags The source flags for the stream source, described in {@link StreamOpFlag}
+     */
+    public DoublePipeline(Supplier<? extends Spliterator<Double>> source, int sourceFlags) {
+        super(source, sourceFlags);
+    }
+
+    /**
+     * Constructor for the head of a stream pipeline.
+     *
      * @param source {@code Spliterator} describing the stream source
      * @param sourceFlags The source flags for the stream source, described in {@link StreamOpFlag}
      */
-    public DoublePipeline(Supplier<? extends Spliterator<Double>> source, int sourceFlags) {
+    public DoublePipeline(Spliterator<Double> source, int sourceFlags) {
         super(source, sourceFlags);
     }
 
@@ -79,38 +89,19 @@
     }
 
     @Override
-    protected <P_IN> Node<Double> collect(PipelineHelper<P_IN, Double> helper, boolean flattenTree) {
+    protected <P_IN> Node<Double> collect(PipelineHelper<P_IN, Double> helper, boolean flattenTree, IntFunction<Double[]> generator) {
         return NodeUtils.doubleCollect(helper, flattenTree);
     }
 
     @Override
-    protected <P_IN> Node<Double> flatten(PipelineHelper<P_IN, Double> helper, Node<Double> node) {
+    protected Node<Double> flatten(Node<Double> node, IntFunction<Double[]> generator) {
         return NodeUtils.doubleFlatten((Node.OfDouble) node);
     }
 
     @Override
     protected <P_IN> Spliterator<Double> wrap(PipelineHelper<P_IN, Double> ph,
-                                              Spliterator<P_IN> spliterator,
                                               boolean isParallel) {
-        Spliterator.OfDouble wrappingSpliterator = new StreamSpliterators.
-                DoubleWrappingSpliterator<>(ph, spliterator, isParallel);
-        if (isParallel) {
-            return wrappingSpliterator;
-        }
-        else {
-            return new Spliterators.AbstractDoubleSpliterator(wrappingSpliterator.estimateSize(),
-                                                              wrappingSpliterator.characteristics()) {
-                @Override
-                public boolean tryAdvance(DoubleConsumer action) {
-                    return wrappingSpliterator.tryAdvance(action);
-                }
-
-                @Override
-                public void forEachRemaining(DoubleConsumer action) {
-                    wrappingSpliterator.forEachRemaining(action);
-                }
-            };
-        }
+        return new StreamSpliterators.DoubleWrappingSpliterator<>(ph, null, isParallel);
     }
 
     @Override
@@ -119,11 +110,6 @@
     }
 
     @Override
-    protected <T> AbstractPipeline<?, T, ?> stream(Supplier<? extends Spliterator<?>> supplier, int flags) {
-        return new DoublePipeline(supplier, flags);
-    }
-
-    @Override
     protected void forEachWithCancel(Spliterator<Double> spliterator, Sink<Double> sink) {
         Spliterator.OfDouble spl = adapt(spliterator);
         DoubleConsumer adaptedSink = adapt(sink);
--- a/src/share/classes/java/util/stream/ForEachOps.java	Thu Mar 28 18:32:00 2013 -0700
+++ b/src/share/classes/java/util/stream/ForEachOps.java	Fri Mar 29 21:57:00 2013 +0100
@@ -525,7 +525,9 @@
                         task.helper.into(task.action, task.spliterator);
                     }
                     else {
-                        Node.Builder<T> nb = task.helper.makeNodeBuilder(task.helper.exactOutputSizeIfKnown(task.spliterator));
+                        Node.Builder<T> nb = task.helper.makeNodeBuilder(
+                                task.helper.exactOutputSizeIfKnown(task.spliterator),
+                                AbstractPipeline.objectArrayGenerator());
                         task.node = task.helper.into(nb, task.spliterator).build();
                     }
                     task.tryComplete();
--- a/src/share/classes/java/util/stream/IntPipeline.java	Thu Mar 28 18:32:00 2013 -0700
+++ b/src/share/classes/java/util/stream/IntPipeline.java	Fri Mar 29 21:57:00 2013 +0100
@@ -54,10 +54,20 @@
     /**
      * Constructor for the head of a stream pipeline.
      *
+     * @param source {@code Supplier<Spliterator>} describing the stream source
+     * @param sourceFlags The source flags for the stream source, described in {@link StreamOpFlag}
+     */
+    public IntPipeline(Supplier<? extends Spliterator<Integer>> source, int sourceFlags) {
+        super(source, sourceFlags);
+    }
+
+    /**
+     * Constructor for the head of a stream pipeline.
+     *
      * @param source {@code Spliterator} describing the stream source
      * @param sourceFlags The source flags for the stream source, described in {@link StreamOpFlag}
      */
-    public IntPipeline(Supplier<? extends Spliterator<Integer>> source, int sourceFlags) {
+    public IntPipeline(Spliterator<Integer> source, int sourceFlags) {
         super(source, sourceFlags);
     }
 
@@ -79,38 +89,19 @@
     }
 
     @Override
-    protected <P_IN> Node<Integer> collect(PipelineHelper<P_IN, Integer> helper, boolean flattenTree) {
+    protected <P_IN> Node<Integer> collect(PipelineHelper<P_IN, Integer> helper, boolean flattenTree, IntFunction<Integer[]> generator) {
         return NodeUtils.intCollect(helper, flattenTree);
     }
 
     @Override
-    protected <P_IN> Node<Integer> flatten(PipelineHelper<P_IN, Integer> helper, Node<Integer> node) {
+    protected Node<Integer> flatten(Node<Integer> node, IntFunction<Integer[]> generator) {
         return NodeUtils.intFlatten((Node.OfInt) node);
     }
 
     @Override
     protected <P_IN> Spliterator<Integer> wrap(PipelineHelper<P_IN, Integer> ph,
-                                               Spliterator<P_IN> spliterator,
                                                boolean isParallel) {
-        Spliterator.OfInt wrappingSpliterator = new StreamSpliterators.
-                IntWrappingSpliterator<>(ph, spliterator, isParallel);
-        if (isParallel) {
-            return wrappingSpliterator;
-        }
-        else {
-            return new Spliterators.AbstractIntSpliterator(wrappingSpliterator.estimateSize(),
-                                                           wrappingSpliterator.characteristics()) {
-                @Override
-                public boolean tryAdvance(IntConsumer action) {
-                    return wrappingSpliterator.tryAdvance(action);
-                }
-
-                @Override
-                public void forEachRemaining(IntConsumer action) {
-                    wrappingSpliterator.forEachRemaining(action);
-                }
-            };
-        }
+        return new StreamSpliterators.IntWrappingSpliterator<>(ph, null, isParallel);
     }
 
     @Override
@@ -119,11 +110,6 @@
     }
 
     @Override
-    protected <T> AbstractPipeline<?, T, ?> stream(Supplier<? extends Spliterator<?>> supplier, int flags) {
-        return new IntPipeline(supplier, flags);
-    }
-
-    @Override
     protected void forEachWithCancel(Spliterator<Integer> spliterator, Sink<Integer> sink) {
         Spliterator.OfInt spl = adapt(spliterator);
         IntConsumer adaptedSink = adapt(sink);
--- a/src/share/classes/java/util/stream/IntermediateOp.java	Thu Mar 28 18:32:00 2013 -0700
+++ b/src/share/classes/java/util/stream/IntermediateOp.java	Fri Mar 29 21:57:00 2013 +0100
@@ -24,6 +24,8 @@
  */
 package java.util.stream;
 
+import java.util.function.IntFunction;
+
 /**
  * An operation in a stream pipeline that takes a stream as input and produces
  * a stream, possibly of a different type, as output.  An intermediate operation
@@ -54,7 +56,7 @@
  * parallelizable, but are not amenable to the automatic parallelization of
  * stateless operations.  Accordingly, a stateful operation must provide its own
  * parallel execution implementation
- * ({@link StatefulOp#evaluateParallel(PipelineHelper)}).
+ * ({@link IntermediateOp#evaluateParallel(PipelineHelper}).
  *
  * @apiNote
  * As an example, consider the stream pipeline:
@@ -119,7 +121,7 @@
 
     /**
      * Returns whether this operation is stateful or not.  If it is stateful,
-     * then the method {@link #evaluateParallel(PipelineHelper)} must be
+     * then the method {@link #evaluateParallel(PipelineHelper} must be
      * overridden.
      *
      * @implSpec The default implementation returns {@code false}.
@@ -158,10 +160,10 @@
      * {@link UnsupportedOperationException}
      *
      * @param helper the pipeline helper
-     * @param <P_IN> the type of elements in the pipeline source
+     * @param generator
      * @return a {@code Node} describing the result of the evaluation
      */
-    default <P_IN> Node<E_OUT> evaluateParallel(PipelineHelper<P_IN, E_OUT> helper) {
+    default <P_IN> Node<E_OUT> evaluateParallel(PipelineHelper<P_IN, E_OUT> helper, IntFunction<E_OUT[]> generator) {
         throw new UnsupportedOperationException("Parallel evaluation is not supported");
     }
 }
--- a/src/share/classes/java/util/stream/LongPipeline.java	Thu Mar 28 18:32:00 2013 -0700
+++ b/src/share/classes/java/util/stream/LongPipeline.java	Fri Mar 29 21:57:00 2013 +0100
@@ -55,10 +55,20 @@
     /**
      * Constructor for the head of a stream pipeline.
      *
+     * @param source {@code Supplier<Spliterator>} describing the stream source
+     * @param sourceFlags The source flags for the stream source, described in {@link StreamOpFlag}
+     */
+    public LongPipeline(Supplier<? extends Spliterator<Long>> source, int sourceFlags) {
+        super(source, sourceFlags);
+    }
+
+    /**
+     * Constructor for the head of a stream pipeline.
+     *
      * @param source {@code Spliterator} describing the stream source
      * @param sourceFlags The source flags for the stream source, described in {@link StreamOpFlag}
      */
-    public LongPipeline(Supplier<? extends Spliterator<Long>> source, int sourceFlags) {
+    public LongPipeline(Spliterator<Long> source, int sourceFlags) {
         super(source, sourceFlags);
     }
 
@@ -80,38 +90,19 @@
     }
 
     @Override
-    protected <P_IN> Node<Long> collect(PipelineHelper<P_IN, Long> helper, boolean flattenTree) {
+    protected <P_IN> Node<Long> collect(PipelineHelper<P_IN, Long> helper, boolean flattenTree, IntFunction<Long[]> generator) {
         return NodeUtils.longCollect(helper, flattenTree);
     }
 
     @Override
-    protected <P_IN> Node<Long> flatten(PipelineHelper<P_IN, Long> helper, Node<Long> node) {
+    protected Node<Long> flatten(Node<Long> node, IntFunction<Long[]> generator) {
         return NodeUtils.longFlatten((Node.OfLong) node);
     }
 
     @Override
     protected <P_IN> Spliterator<Long> wrap(PipelineHelper<P_IN, Long> ph,
-                                            Spliterator<P_IN> spliterator,
                                             boolean isParallel) {
-        Spliterator.OfLong wrappingSpliterator = new StreamSpliterators.
-                LongWrappingSpliterator<>(ph, spliterator, isParallel);
-        if (isParallel) {
-            return wrappingSpliterator;
-        }
-        else {
-            return new Spliterators.AbstractLongSpliterator(wrappingSpliterator.estimateSize(),
-                                                            wrappingSpliterator.characteristics()) {
-                @Override
-                public boolean tryAdvance(LongConsumer action) {
-                    return wrappingSpliterator.tryAdvance(action);
-                }
-
-                @Override
-                public void forEachRemaining(LongConsumer action) {
-                    wrappingSpliterator.forEachRemaining(action);
-                }
-            };
-        }
+        return new StreamSpliterators.LongWrappingSpliterator<>(ph, null, isParallel);
     }
 
     @Override
@@ -120,11 +111,6 @@
     }
 
     @Override
-    protected <T> AbstractPipeline<?, T, ?> stream(Supplier<? extends Spliterator<?>> supplier, int flags) {
-        return new LongPipeline(supplier, flags);
-    }
-
-    @Override
     protected void forEachWithCancel(Spliterator<Long> spliterator, Sink<Long> sink) {
         Spliterator.OfLong spl = adapt(spliterator);
         LongConsumer adaptedSink =  adapt(sink);
--- a/src/share/classes/java/util/stream/NodeUtils.java	Thu Mar 28 18:32:00 2013 -0700
+++ b/src/share/classes/java/util/stream/NodeUtils.java	Fri Mar 29 21:57:00 2013 +0100
@@ -57,28 +57,27 @@
      * to produce a flat {@code Node} whose content is an array.
      *
      * @param helper the pipeline helper capturing the pipeline.
-     * @param flattenTree if true the returned {@link Node} is flat and has no children, otherwise
-     *                    the {@link Node} may be a root node in a tree whose shape mirrors that of the
+     * @param flattenTree if true the returned {@link java.util.stream.Node} is flat and has no children, otherwise
+     *                    the {@link java.util.stream.Node} may be a root node in a tree whose shape mirrors that of the
      *                    parallel computation.
-     * @param <P_IN> type of input elements to the pipeline
-     * @param <P_OUT> type of output elements from the pipeline
+     * @param generator
      * @return the {@link Node} encapsulating the output elements.
      */
     public static <P_IN, P_OUT> Node<P_OUT> collect(PipelineHelper<P_IN, P_OUT> helper,
-                                                    boolean flattenTree) {
+                                                    boolean flattenTree, IntFunction<P_OUT[]> generator) {
         Spliterator<P_IN> spliterator = helper.sourceSpliterator();
         long size = helper.exactOutputSizeIfKnown(spliterator);
         if (size >= 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
             if (size >= Streams.MAX_ARRAY_SIZE)
                 throw new IllegalArgumentException("Stream size exceeds max array size");
-            P_OUT[] array = helper.arrayGenerator().apply((int) size);
+            P_OUT[] array = generator.apply((int) size);
             new SizedCollectorTask<>(spliterator, helper, array).invoke();
             return Nodes.node(array);
         } else {
-            Node<P_OUT> node = new CollectorTask<>(helper).invoke();
+            Node<P_OUT> node = new CollectorTask<>(helper, generator, spliterator).invoke();
 
             // @@@ using default F/J pool, will that be different from that used by helper.invoke?
-            return flattenTree ? flatten(node, helper.arrayGenerator()) : node;
+            return flattenTree ? flatten(node, generator) : node;
         }
     }
 
@@ -141,7 +140,7 @@
             return Nodes.intNode(array);
         }
         else {
-            Node.OfInt node = new IntCollectorTask<>(helper).invoke();
+            Node.OfInt node = new IntCollectorTask<>(helper, spliterator).invoke();
 
             // @@@ using default F/J pool, will that be different from that used by helper.invoke?
             return flattenTree ? intFlatten(node) : node;
@@ -205,7 +204,7 @@
             return Nodes.longNode(array);
         }
         else {
-            Node.OfLong node = new LongCollectorTask<>(helper).invoke();
+            Node.OfLong node = new LongCollectorTask<>(helper, spliterator).invoke();
 
             // @@@ using default F/J pool, will that be different from that used by helper.invoke?
             return flattenTree ? longFlatten(node) : node;
@@ -269,7 +268,7 @@
             return Nodes.doubleNode(array);
         }
         else {
-            Node.OfDouble node = new DoubleCollectorTask<>(helper).invoke();
+            Node.OfDouble node = new DoubleCollectorTask<>(helper, spliterator).invoke();
 
             // @@@ using default F/J pool, will that be different from that used by helper.invoke?
             return flattenTree ? doubleFlatten(node) : node;
@@ -302,15 +301,18 @@
 
     private static final class CollectorTask<T, U> extends AbstractTask<T, U, Node<U>, CollectorTask<T, U>> {
         private final PipelineHelper<T, U> helper;
+        private final IntFunction<U[]> generator;
 
-        CollectorTask(PipelineHelper<T, U> helper) {
-            super(helper);
+        CollectorTask(PipelineHelper<T, U> helper, IntFunction<U[]> generator, Spliterator<T> spliterator) {
+            super(helper, spliterator, AbstractTask.suggestTargetSize(spliterator.estimateSize()));
             this.helper = helper;
+            this.generator = generator;
         }
 
         CollectorTask(CollectorTask<T, U> parent, Spliterator<T> spliterator) {
             super(parent, spliterator);
             helper = parent.helper;
+            generator = parent.generator;
         }
 
         @Override
@@ -321,7 +323,7 @@
         @Override
         protected Node<U> doLeaf() {
             Node.Builder<U> builder = Nodes.makeBuilder(helper.exactOutputSizeIfKnown(spliterator),
-                                                        helper.arrayGenerator());
+                                                        generator);
             return helper.into(builder, spliterator).build();
         }
 
@@ -487,8 +489,8 @@
     private static final class IntCollectorTask<T> extends AbstractTask<T, Integer, Node.OfInt, IntCollectorTask<T>> {
         private final PipelineHelper<T, Integer> helper;
 
-        IntCollectorTask(PipelineHelper<T, Integer> helper) {
-            super(helper);
+        IntCollectorTask(PipelineHelper<T, Integer> helper, Spliterator<T> spliterator) {
+            super(helper, spliterator, AbstractTask.suggestTargetSize(spliterator.estimateSize()));
             this.helper = helper;
         }
 
@@ -667,8 +669,8 @@
     private static final class LongCollectorTask<T> extends AbstractTask<T, Long, Node.OfLong, LongCollectorTask<T>> {
         private final PipelineHelper<T, Long> helper;
 
-        LongCollectorTask(PipelineHelper<T, Long> helper) {
-            super(helper);
+        LongCollectorTask(PipelineHelper<T, Long> helper, Spliterator<T> spliterator) {
+            super(helper, spliterator, AbstractTask.suggestTargetSize(spliterator.estimateSize()));
             this.helper = helper;
         }
 
@@ -846,8 +848,8 @@
     private static final class DoubleCollectorTask<T> extends AbstractTask<T, Double, Node.OfDouble, DoubleCollectorTask<T>> {
         private final PipelineHelper<T, Double> helper;
 
-        DoubleCollectorTask(PipelineHelper<T, Double> helper) {
-            super(helper);
+        DoubleCollectorTask(PipelineHelper<T, Double> helper, Spliterator<T> spliterator) {
+            super(helper, spliterator, AbstractTask.suggestTargetSize(spliterator.estimateSize()));
             this.helper = helper;
         }
 
--- a/src/share/classes/java/util/stream/PipelineHelper.java	Thu Mar 28 18:32:00 2013 -0700
+++ b/src/share/classes/java/util/stream/PipelineHelper.java	Fri Mar 29 21:57:00 2013 +0100
@@ -43,7 +43,7 @@
  * by this {@code PipelineHelper}.  The {@code PipelineHelper} is passed to the
  * {@link TerminalOp#evaluateParallel(PipelineHelper)},
  * {@link TerminalOp#evaluateSequential(PipelineHelper)}, and
- * {@link StatefulOp#evaluateParallel(PipelineHelper)}, methods, which can use
+ * {@link IntermediateOp#evaluateParallel(PipelineHelper}, methods, which can use
  * the {@code PipelineHelper} to access the source {@code Spliterator} for the
  * pipeline, information about the pipeline such as input shape, output shape,
  * stream flags, and size, and use the helper methods such as
@@ -57,18 +57,6 @@
 interface PipelineHelper<P_IN, P_OUT> {
 
     /**
-     * Gets the {@code StreamShape} describing the input shape of the pipeline
-     * @return The input shape of the pipeline
-     */
-    StreamShape getInputShape();
-
-    /**
-     * Gets the {@code StreamShape} describing the output shape of the pipeline
-     * @return The output shape of the pipeline
-     */
-    StreamShape getOutputShape();
-
-    /**
      * Gets the combined stream and operation flags for the output of the
      * pipeline.  This will incorporate stream flags from the stream source, all
      * the intermediate operations and the terminal operation.
@@ -80,17 +68,6 @@
     int getStreamAndOpFlags();
 
     /**
-     * Gets the operation flags for the terminal operation.
-     *
-     * @return the operation flags for the terminal operation.
-     * @see StreamOpFlag
-     */
-    // @@@ Specifying this concisely is somewhat complicated since since the actual terminal operation flags
-    //     are masked by StreamOpFlag.UPSTREAM_TERMINAL_OP_MASK as the flags propagate upstream through parallel
-    //     pipeline chunks
-    int getTerminalOpFlags();
-
-    /**
      * Returns whether this pipeline is parallel or sequential
      *
      * @return true if the pipeline is a parallel pipeline, otherwise false
@@ -191,14 +168,16 @@
      * Constructs a @{link Node.Builder} compatible with the output shape of
      * this {@code PipelineHelper}
      *
+     *
      * @param exactSizeIfKnown if >=0 then a builder will be created that has a
      *        fixed capacity of exactly sizeIfKnown elements; if < 0 then the
      *        builder has variable capacity.  A fixed capacity builder will fail
      *        if an element is added and the builder has reached capacity.
+     * @param generator
      * @return A {@code Node.Builder} compatible with the output shape of this
      *         {@code PipelineHelper}
      */
-    Node.Builder<P_OUT> makeNodeBuilder(long exactSizeIfKnown);
+    Node.Builder<P_OUT> makeNodeBuilder(long exactSizeIfKnown, IntFunction<P_OUT[]> generator);
 
     /**
      * Collects all output elements resulting from applying the pipeline stages
@@ -218,16 +197,10 @@
      *        {@code Node} returned will contain no children, otherwise the
      *        {@code Node} may represent the root in a tree that reflects the
      *        shape of the computation tree.
+     * @param generator
      * @return the {@code Node} containing all output elements
      */
-    Node<P_OUT> collectOutput(boolean flatten);
-
-    /**
-     * Gets an array factory associated with the output type of this pipeline.
-     *
-     * @return a factory for arrays of the output type of this pipeline.
-     */
-    IntFunction<P_OUT[]> arrayGenerator();
+    Node<P_OUT> collectOutput(boolean flatten, IntFunction<P_OUT[]> generator);
 
     /**
      * Collects all output elements resulting from the applying the pipeline
@@ -236,7 +209,7 @@
      * output elements will respect the encounter order of the source stream,
      * and all computation will happen in the invoking thread.
      * <p>
-     * Implementations of {@link StatefulOp#evaluateParallel(PipelineHelper)}
+     * Implementations of {@link IntermediateOp#evaluateParallel(PipelineHelper}
      * can defer to this method if a sequential implementation is acceptable.
      *
      * @implSpec
@@ -252,7 +225,10 @@
      *
      * @param op An {@code IntermediateOp} representing the final stage in the
      *        pipeline, typically a {@code StatefulOp}
+     * @param sourceSpliterator The spliterator for the source
+     * @param generator
      * @return A {@code Node} containing the output of the stream pipeline
      */
-    Node<P_OUT> evaluateSequential(IntermediateOp<P_OUT, P_OUT> op);
+    Node<P_OUT> evaluateSequential(IntermediateOp<P_OUT, P_OUT> op,
+                                   Spliterator<P_IN> sourceSpliterator, IntFunction<P_OUT[]> generator);
 }
--- a/src/share/classes/java/util/stream/ReferencePipeline.java	Thu Mar 28 18:32:00 2013 -0700
+++ b/src/share/classes/java/util/stream/ReferencePipeline.java	Fri Mar 29 21:57:00 2013 +0100
@@ -57,7 +57,7 @@
     /**
      * Constructor for the head of a stream pipeline.
      *
-     * @param source {@code Spliterator} describing the stream source
+     * @param source {@code Supplier<Spliterator>} describing the stream source
      * @param sourceFlags The source flags for the stream source, described in {@link StreamOpFlag}
      */
     public ReferencePipeline(Supplier<? extends Spliterator<?>> source,
@@ -66,6 +66,16 @@
     }
 
     /**
+     * Constructor for the head of a stream pipeline.
+     *
+     * @param source {@code Spliterator} describing the stream source
+     * @param sourceFlags The source flags for the stream source, described in {@link StreamOpFlag}
+     */
+    public ReferencePipeline(Spliterator<?> source, int sourceFlags) {
+        super(source, sourceFlags);
+    }
+
+    /**
      * Constructor for appending an intermediate operation onto an existing pipeline.
      *
      * @param upstream the upstream element source.
@@ -83,38 +93,19 @@
     }
 
     @Override
-    protected <P_IN> Node<U> collect(PipelineHelper<P_IN, U> helper, boolean flattenTree) {
-        return NodeUtils.collect(helper, flattenTree);
+    protected <P_IN> Node<U> collect(PipelineHelper<P_IN, U> helper, boolean flattenTree, IntFunction<U[]> generator) {
+        return NodeUtils.collect(helper, flattenTree, generator);
     }
 
     @Override
-    protected <P_IN> Node<U> flatten(PipelineHelper<P_IN, U> helper, Node<U> node) {
-        return NodeUtils.flatten(node, helper.arrayGenerator());
+    protected Node<U> flatten(Node<U> node, IntFunction<U[]> generator) {
+        return NodeUtils.flatten(node, generator);
     }
 
     @Override
     protected <P_IN> Spliterator<U> wrap(PipelineHelper<P_IN, U> ph,
-                                         Spliterator<P_IN> spliterator,
                                          boolean isParallel) {
-        Spliterator<U> wrappingSpliterator = new StreamSpliterators.
-                WrappingSpliterator<>(ph, spliterator, isParallel);
-        if (isParallel) {
-            return wrappingSpliterator;
-        }
-        else {
-            return new Spliterators.AbstractSpliterator<U>(wrappingSpliterator.estimateSize(),
-                                                           wrappingSpliterator.characteristics()) {
-                @Override
-                public boolean tryAdvance(Consumer<? super U> action) {
-                    return wrappingSpliterator.tryAdvance(action);
-                }
-
-                @Override
-                public void forEachRemaining(Consumer<? super U> action) {
-                    wrappingSpliterator.forEachRemaining(action);
-                }
-            };
-        }
+        return new StreamSpliterators.WrappingSpliterator<>(ph, null, isParallel);
     }
 
     @Override
@@ -123,11 +114,6 @@
     }
 
     @Override
-    protected <E_OUT> AbstractPipeline<?, E_OUT, ?> stream(Supplier<? extends Spliterator<?>> supplier, int flags) {
-        return new ReferencePipeline<>(supplier, flags);
-    }
-
-    @Override
     protected void forEachWithCancel(Spliterator<U> spliterator, Sink<U> sink) {
         while (!sink.cancellationRequested() && spliterator.tryAdvance(sink)) { }
     }
--- a/src/share/classes/java/util/stream/SliceOp.java	Thu Mar 28 18:32:00 2013 -0700
+++ b/src/share/classes/java/util/stream/SliceOp.java	Fri Mar 29 21:57:00 2013 +0100
@@ -28,6 +28,7 @@
 import java.util.List;
 import java.util.Spliterator;
 import java.util.concurrent.CountedCompleter;
+import java.util.function.IntFunction;
 
 /**
  * A {@link StatefulOp} that transforms a stream into a subsequence of itself.
@@ -204,7 +205,7 @@
 //    }
 
     @Override
-    public <S> Node<T> evaluateParallel(PipelineHelper<S, T> helper) {
+    public <S> Node<T> evaluateParallel(PipelineHelper<S, T> helper, IntFunction<T[]> generator) {
         // Parallel strategy -- two cases
         // IF we have full size information
         // - decompose, keeping track of each leaf's (offset, size)
@@ -224,7 +225,7 @@
 //        if (size >= 0 && helper.getOutputSizeIfKnown() == size && spliterator.isPredictableSplits())
 //            return helper.invoke(new SizedSliceTask<>(helper, skip, getFinalSize(helper, spliterator)));
 //        else
-              return new SliceTask<>(this, helper, skip, limit).invoke();
+              return new SliceTask<>(this, helper, generator, skip, limit).invoke();
     }
 
     @Override
@@ -240,14 +241,16 @@
      */
     private static final class SliceTask<S, T> extends AbstractShortCircuitTask<S, T, Node<T>, SliceTask<S, T>> {
         private final SliceOp<T> op;
+        private final IntFunction<T[]> generator;
         private final long targetOffset, targetSize;
         private long thisNodeSize;
 
         private volatile boolean completed;
 
-        SliceTask(SliceOp<T> op, PipelineHelper<S, T> helper, long offset, long size) {
+        SliceTask(SliceOp<T> op, PipelineHelper<S, T> helper, IntFunction<T[]> generator, long offset, long size) {
             super(helper);
             this.op = op;
+            this.generator = generator;
             this.targetOffset = offset;
             this.targetSize = size;
         }
@@ -255,6 +258,7 @@
         SliceTask(SliceTask<S, T> parent, Spliterator<S> spliterator) {
             super(parent, spliterator);
             this.op = parent.op;
+            this.generator = parent.generator;
             this.targetOffset = parent.targetOffset;
             this.targetSize = parent.targetSize;
         }
@@ -272,10 +276,10 @@
         @Override
         protected final Node<T> doLeaf() {
             if (isRoot()) {
-                return helper.evaluateSequential(op);
+                return helper.evaluateSequential(op, spliterator, generator);
             }
             else {
-                Node<T> node = helper.into(helper.makeNodeBuilder(-1),
+                Node<T> node = helper.into(helper.makeNodeBuilder(-1, generator),
                                            spliterator).build();
                 thisNodeSize = node.count();
                 completed = true;
@@ -362,7 +366,7 @@
             if (skipLeft == 0 && skipRight == 0)
                 return input;
             else {
-                return Nodes.createTruncatedNode(input, skipLeft, thisNodeSize - skipRight, helper.arrayGenerator());
+                return Nodes.createTruncatedNode(input, skipLeft, thisNodeSize - skipRight, generator);
             }
         }
     }
--- a/src/share/classes/java/util/stream/SortedOp.java	Thu Mar 28 18:32:00 2013 -0700
+++ b/src/share/classes/java/util/stream/SortedOp.java	Fri Mar 29 21:57:00 2013 +0100
@@ -30,6 +30,7 @@
 import java.util.Comparators;
 import java.util.Objects;
 import java.util.concurrent.ForkJoinTask;
+import java.util.function.IntFunction;
 
 
 /**
@@ -96,15 +97,15 @@
         }
 
         @Override
-        public <P_IN> Node<T> evaluateParallel(PipelineHelper<P_IN, T> helper) {
+        public <P_IN> Node<T> evaluateParallel(PipelineHelper<P_IN, T> helper, IntFunction<T[]> generator) {
             // If the input is already naturally sorted and this operation
             // naturally sorts then collect the output
             if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags()) && isNaturalSort) {
-                return helper.collectOutput(false);
+                return helper.collectOutput(false, generator);
             }
             else {
                 // @@@ Weak two-pass parallel implementation; parallel collect, parallel sort
-                T[] flattenedData = helper.collectOutput(true).asArray(helper.arrayGenerator());
+                T[] flattenedData = helper.collectOutput(true, generator).asArray(generator);
                 Arrays.parallelSort(flattenedData, comparator);
                 return Nodes.node(flattenedData);
             }
@@ -130,12 +131,12 @@
         }
 
         @Override
-        public <P_IN> Node<Integer> evaluateParallel(PipelineHelper<P_IN, Integer> helper) {
+        public <P_IN> Node<Integer> evaluateParallel(PipelineHelper<P_IN, Integer> helper, IntFunction<Integer[]> generator) {
             if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) {
-                return helper.collectOutput(false);
+                return helper.collectOutput(false, generator);
             }
             else {
-                Node.OfInt n = (Node.OfInt) helper.collectOutput(true);
+                Node.OfInt n = (Node.OfInt) helper.collectOutput(true, generator);
 
                 int[] content = n.asIntArray();
                 Arrays.parallelSort(content);
@@ -164,12 +165,12 @@
         }
 
         @Override
-        public <P_IN> Node<Long> evaluateParallel(PipelineHelper<P_IN, Long> helper) {
+        public <P_IN> Node<Long> evaluateParallel(PipelineHelper<P_IN, Long> helper, IntFunction<Long[]> generator) {
             if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) {
-                return helper.collectOutput(false);
+                return helper.collectOutput(false, generator);
             }
             else {
-                Node.OfLong n = (Node.OfLong) helper.collectOutput(true);
+                Node.OfLong n = (Node.OfLong) helper.collectOutput(true, generator);
 
                 long[] content = n.asLongArray();
                 Arrays.parallelSort(content);
@@ -198,12 +199,12 @@
         }
 
         @Override
-        public <P_IN> Node<Double> evaluateParallel(PipelineHelper<P_IN, Double> helper) {
+        public <P_IN> Node<Double> evaluateParallel(PipelineHelper<P_IN, Double> helper, IntFunction<Double[]> generator) {
             if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) {
-                return helper.collectOutput(false);
+                return helper.collectOutput(false, generator);
             }
             else {
-                Node.OfDouble n = (Node.OfDouble) helper.collectOutput(true);
+                Node.OfDouble n = (Node.OfDouble) helper.collectOutput(true, generator);
 
                 double[] content = n.asDoubleArray();
                 Arrays.parallelSort(content);
--- a/src/share/classes/java/util/stream/StatefulOp.java	Thu Mar 28 18:32:00 2013 -0700
+++ b/src/share/classes/java/util/stream/StatefulOp.java	Fri Mar 29 21:57:00 2013 +0100
@@ -24,6 +24,8 @@
  */
 package java.util.stream;
 
+import java.util.function.IntFunction;
+
 /**
  * A stateful intermediate stream operation ({@link IntermediateOp}).
  * <em>Stateful</em> means that state is accumulated as elements are processed.
@@ -40,7 +42,7 @@
  * parallelizable, but are not amenable to the automatic parallelization of
  * stateless operations.  Accordingly, a stateful operation must provide its
  * own parallel execution implementation
- * ({@link #evaluateParallel(PipelineHelper)}) as well as
+ * ({@link IntermediateOp#evaluateParallel(PipelineHelper}) as well as
  * {@link IntermediateOp#wrapSink(int, Sink)}.
  *
  * @param <E> Type of input and output elements.
@@ -71,5 +73,5 @@
      * is available as
      * {@link PipelineHelper#evaluateSequential(IntermediateOp)}.
      */
-    <P_IN> Node<E> evaluateParallel(PipelineHelper<P_IN, E> helper);
+    <P_IN> Node<E> evaluateParallel(PipelineHelper<P_IN, E> helper, IntFunction<E[]> generator);
 }
--- a/src/share/classes/java/util/stream/StreamOpFlag.java	Thu Mar 28 18:32:00 2013 -0700
+++ b/src/share/classes/java/util/stream/StreamOpFlag.java	Fri Mar 29 21:57:00 2013 +0100
@@ -27,6 +27,7 @@
 import java.util.EnumMap;
 import java.util.Map;
 import java.util.Spliterator;
+import java.util.StringJoiner;
 
 /**
  * Flags corresponding to characteristics of streams and operations. Flags are
--- a/src/share/classes/java/util/stream/StreamSpliterators.java	Thu Mar 28 18:32:00 2013 -0700
+++ b/src/share/classes/java/util/stream/StreamSpliterators.java	Fri Mar 29 21:57:00 2013 +0100
@@ -40,6 +40,12 @@
  */
 class StreamSpliterators {
 
+    /**
+     * Abstract wrapping spliterator that binds to the spliterator of a
+     * pipeline helper on first operation.
+     * @@@ If Spliterator.SUBSIZED was propagated as a stream flag through
+     * the pipeline this class could be converted to a late-binding spliterator
+     */
     private static abstract class AbstractWrappingSpliterator<P_IN, P_OUT, T_BUFFER extends AbstractSpinedBuffer<P_OUT>>
             implements Spliterator<P_OUT> {
         // True if this spliterator supports splitting
@@ -48,7 +54,8 @@
         final PipelineHelper<P_IN, P_OUT> ph;
 
         // The source spliterator whose elements are traversed and pushed through bufferSink
-        final Spliterator<P_IN> spliterator;
+        // For the top-level spliterator is null until the spliterator is required
+        Spliterator<P_IN> spliterator;
 
         // The source of the sink chain for the pipeline where the last sink
         // is connected to buffer
@@ -79,11 +86,17 @@
             this.isParallel = parallel;
         }
 
-        boolean doAdvance() {
+        final void init() {
+            if (spliterator == null)
+                spliterator = ph.sourceSpliterator();
+        }
+
+        final boolean doAdvance() {
             if (buffer == null) {
                 if (finished)
                     return false;
 
+                init();
                 initPartialTraversalState();
                 nextToConsume = 0;
                 bufferSink.begin(spliterator.getExactSizeIfKnown());
@@ -108,6 +121,8 @@
         @Override
         public Spliterator<P_OUT> trySplit() {
             if (isParallel && !finished) {
+                init();
+
                 Spliterator<P_IN> split = spliterator.trySplit();
                 return (split == null) ? null : wrap(split);
             }
@@ -130,20 +145,31 @@
         }
 
         @Override
-        public long estimateSize() {
+        public final long estimateSize() {
+            init();
+
             return StreamOpFlag.SIZED.isKnown(ph.getStreamAndOpFlags()) ? spliterator.estimateSize() : Long.MAX_VALUE;
         }
 
         @Override
-        public long getExactSizeIfKnown() {
+        public final long getExactSizeIfKnown() {
+            init();
+
             return StreamOpFlag.SIZED.isKnown(ph.getStreamAndOpFlags()) ? spliterator.getExactSizeIfKnown() : -1;
         }
 
         @Override
-        public int characteristics() {
+        public final int characteristics() {
+            init();
+
             // Get the characteristics from the pipeline
             int c = StreamOpFlag.toCharacteristics(StreamOpFlag.toStreamFlags(ph.getStreamAndOpFlags()));
 
+            // @@@ determining if the source spliterator is SUBSIZED results in
+            // in the wrapping spliterator not being late-binding
+            // to fix this requires that SUBSIZED is mapped to a stream flag
+            // and propagated through the pipeline
+
             // Mask off the size and uniform characteristics and replace with those of the spliterator
             // Note that a non-uniform spliterator can change from something with an exact size to an
             // estimate for a sub-split, for example with HashSet where the size is known at the top
@@ -157,7 +183,7 @@
         }
 
         @Override
-        public String toString() {
+        public final String toString() {
             return getClass().getName() + "[" + spliterator + "]";
         }
     }
@@ -193,6 +219,8 @@
         @Override
         public void forEachRemaining(Consumer<? super P_OUT> consumer) {
             if (buffer == null && !finished) {
+                init();
+
                 ph.into((Sink<P_OUT>) consumer::accept, spliterator);
                 finished = true;
             }
@@ -239,6 +267,8 @@
         @Override
         public void forEachRemaining(IntConsumer consumer) {
             if (buffer == null && !finished) {
+                init();
+
                 ph.into((Sink.OfInt) consumer::accept, spliterator);
                 finished = true;
             }
@@ -285,6 +315,8 @@
         @Override
         public void forEachRemaining(LongConsumer consumer) {
             if (buffer == null && !finished) {
+                init();
+
                 ph.into((Sink.OfLong) consumer::accept, spliterator);
                 finished = true;
             }
@@ -331,6 +363,8 @@
         @Override
         public void forEachRemaining(DoubleConsumer consumer) {
             if (buffer == null && !finished) {
+                init();
+
                 ph.into((Sink.OfDouble) consumer::accept, spliterator);
                 finished = true;
             }
--- a/src/share/classes/java/util/stream/Streams.java	Thu Mar 28 18:32:00 2013 -0700
+++ b/src/share/classes/java/util/stream/Streams.java	Fri Mar 29 21:57:00 2013 +0100
@@ -170,7 +170,7 @@
      */
     public static<T> Stream<T> stream(Spliterator<T> spliterator) {
         Objects.requireNonNull(spliterator);
-        return new ReferencePipeline<>(() -> spliterator,
+        return new ReferencePipeline<>(spliterator,
                                        StreamOpFlag.fromCharacteristics(spliterator) & ~StreamOpFlag.IS_PARALLEL);
     }
 
@@ -198,7 +198,7 @@
      */
     public static<T> Stream<T> parallelStream(Spliterator<T> spliterator) {
         Objects.requireNonNull(spliterator);
-        return new ReferencePipeline<>(() -> spliterator,
+        return new ReferencePipeline<>(spliterator,
                                        StreamOpFlag.fromCharacteristics(spliterator) | StreamOpFlag.IS_PARALLEL);
     }
 
@@ -296,7 +296,7 @@
      * @return A new sequential {@code IntStream}
      */
     public static IntStream intStream(Spliterator.OfInt spliterator) {
-        return intStream(() -> spliterator, spliterator.characteristics());
+        return new IntPipeline<>(spliterator, spliterator.characteristics() & ~StreamOpFlag.IS_PARALLEL);
     }
 
     /**
@@ -322,7 +322,8 @@
      * @return A new parallel {@code IntStream}
      */
     public static IntStream intParallelStream(Spliterator.OfInt spliterator) {
-        return intParallelStream(() -> spliterator, spliterator.characteristics());
+        return new IntPipeline<>(spliterator,
+                                 StreamOpFlag.fromCharacteristics(spliterator.characteristics()) | StreamOpFlag.IS_PARALLEL);
     }
 
     // LongStream construction
@@ -419,7 +420,8 @@
      * @return A new sequential {@code LongStream}
      */
     public static LongStream longStream(Spliterator.OfLong spliterator) {
-        return longStream(() -> spliterator, spliterator.characteristics());
+        return new LongPipeline<>(spliterator,
+                                  StreamOpFlag.fromCharacteristics(spliterator.characteristics()) & ~StreamOpFlag.IS_PARALLEL);
     }
 
     /**
@@ -445,7 +447,8 @@
      * @return A new parallel {@code LongStream}
      */
     public static LongStream longParallelStream(Spliterator.OfLong spliterator) {
-        return longParallelStream(() -> spliterator, spliterator.characteristics());
+        return new LongPipeline<>(spliterator,
+                                  StreamOpFlag.fromCharacteristics(spliterator.characteristics()) | StreamOpFlag.IS_PARALLEL);
     }
 
     // DoubleStream construction
@@ -544,7 +547,8 @@
      * @return A new sequential {@code DoubleStream}
      */
     public static DoubleStream doubleStream(Spliterator.OfDouble spliterator) {
-        return doubleStream(() -> spliterator, spliterator.characteristics());
+        return new DoublePipeline<>(spliterator,
+                                    StreamOpFlag.fromCharacteristics(spliterator.characteristics()) & ~StreamOpFlag.IS_PARALLEL);
     }
 
     /**
@@ -570,7 +574,8 @@
      * @return A new parallel {@code DoubleStream}
      */
     public static DoubleStream doubleParallelStream(Spliterator.OfDouble spliterator) {
-        return doubleParallelStream(() -> spliterator, spliterator.characteristics());
+        return new DoublePipeline<>(spliterator,
+                                    StreamOpFlag.fromCharacteristics(spliterator.characteristics()) | StreamOpFlag.IS_PARALLEL);
     }
 
     // Infinite Stream generators
--- a/test-ng/bootlib/java/util/stream/CollectorOps.java	Thu Mar 28 18:32:00 2013 -0700
+++ b/test-ng/bootlib/java/util/stream/CollectorOps.java	Fri Mar 29 21:57:00 2013 +0100
@@ -26,6 +26,8 @@
 
 import org.testng.Assert;
 
+import java.util.function.IntFunction;
+
 public final class CollectorOps {
     private CollectorOps() { }
 
@@ -67,8 +69,8 @@
         }
 
         @Override
-        public <P_IN> Node<E_IN> evaluateParallel(PipelineHelper<P_IN, E_IN> helper) {
-            return helper.collectOutput(false);
+        public <P_IN> Node<E_IN> evaluateParallel(PipelineHelper<P_IN, E_IN> helper, IntFunction<E_IN[]> generator) {
+            return helper.collectOutput(false, generator);
         }
     }
 
@@ -82,11 +84,11 @@
         }
 
         @Override
-        public <P_IN> Node<T> evaluateParallel(PipelineHelper<P_IN, T> helper) {
+        public <P_IN> Node<T> evaluateParallel(PipelineHelper<P_IN, T> helper, IntFunction<T[]> generator) {
             int flags = helper.getStreamAndOpFlags();
 
             Assert.assertTrue(StreamOpFlag.SIZED.isKnown(flags));
-            return super.evaluateParallel(helper);
+            return super.evaluateParallel(helper, generator);
         }
 
         public static class OfInt extends TestParallelSizedOp<Integer> {
--- a/test-ng/bootlib/java/util/stream/LambdaTestHelpers.java	Thu Mar 28 18:32:00 2013 -0700
+++ b/test-ng/bootlib/java/util/stream/LambdaTestHelpers.java	Fri Mar 29 21:57:00 2013 +0100
@@ -441,4 +441,15 @@
 
         return result;
     }
+
+    public static String flagsToString(int flags) {
+        StringJoiner sj = new StringJoiner(", ", "StreamOpFlag[", "]");
+        if (StreamOpFlag.DISTINCT.isKnown(flags)) sj.add("IS_DISTINCT");
+        if (StreamOpFlag.ORDERED.isKnown(flags)) sj.add("IS_ORDERED");
+        if (StreamOpFlag.SIZED.isKnown(flags)) sj.add("IS_SIZED");
+        if (StreamOpFlag.SORTED.isKnown(flags)) sj.add("IS_SORTED");
+        if (StreamOpFlag.SHORT_CIRCUIT.isKnown(flags)) sj.add("IS_SHORT_CIRCUIT");
+        if (StreamOpFlag.PARALLEL.isKnown(flags)) sj.add("IS_PARALLEL");
+        return sj.toString();
+    }
 }
--- a/test-ng/bootlib/java/util/stream/OpTestCase.java	Thu Mar 28 18:32:00 2013 -0700
+++ b/test-ng/bootlib/java/util/stream/OpTestCase.java	Fri Mar 29 21:57:00 2013 +0100
@@ -66,7 +66,7 @@
         testScenarios.put(StreamShape.DOUBLE_VALUE, Collections.unmodifiableSet(EnumSet.allOf(DoubleStreamTestScenario.class)));
     }
 
-    @SuppressWarnings("raw")
+    @SuppressWarnings("rawtypes")
     public static int getStreamFlags(BaseStream s) {
         return ((AbstractPipeline) s).getStreamFlags();
     }
@@ -295,8 +295,8 @@
                             () -> String.format("%n%s: %s != %s", test, refResult, result));
 
                     after.accept(data);
-                } catch (AssertionError ae) {
-                    errors.add(ae);
+//                } catch (AssertionError ae) {
+//                    errors.add(ae);
                 } catch (Throwable t) {
                     errors.add(new Error(String.format("%s: %s", test, t), t));
                 }
@@ -415,13 +415,14 @@
             AbstractPipeline ap = (AbstractPipeline) out;
             StreamShape shape = ap.getOutputShape();
 
-            Node<U> node = ((AbstractPipeline) out).collectOutput(false);
+            Node<U> node = ap.collectOutput(false);
             if (refResult == null) {
                 // Sequentially collect the output that will be input to the terminal op
-                refResult = terminalF.apply((S_OUT) ap.stream(node::spliterator, StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SIZED));
+                refResult = terminalF.apply((S_OUT) createPipeline(shape, node.spliterator(),
+                                                                   StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SIZED));
             } else if (testSet.contains(TerminalTestScenario.SINGLE_SEQUENTIAL)) {
-                // @@@ Using specific stream implementation
-                S_OUT source = (S_OUT) ap.stream(node::spliterator, StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SIZED);
+                S_OUT source = (S_OUT) createPipeline(shape, node.spliterator(),
+                                                      StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SIZED);
                 BiConsumer<R, R> asserter = sequentialEqualityAsserter.apply(source);
                 R result = terminalF.apply(source);
                 LambdaTestHelpers.launderAssertion(() -> asserter.accept(refResult, result),
@@ -429,8 +430,8 @@
             }
 
             if (testSet.contains(TerminalTestScenario.SINGLE_SEQUENTIAL_PULL)) {
-                // @@@ Using specific stream implementation
-                S_OUT source = (S_OUT) ap.stream(node::spliterator, StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SIZED);
+                S_OUT source = (S_OUT) createPipeline(shape, node.spliterator(),
+                                                      StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SIZED);
                 // Force pull mode
                 source = (S_OUT) chain(source, new PullOnlyOp<U>(shape));
                 BiConsumer<R, R> asserter = sequentialEqualityAsserter.apply(source);
@@ -440,8 +441,8 @@
             }
 
             if (testSet.contains(TerminalTestScenario.SINGLE_PARALLEL)) {
-                // @@@ Using specific stream implementation
-                S_OUT source = (S_OUT) ap.stream(node::spliterator, StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SIZED | StreamOpFlag.IS_PARALLEL);
+                S_OUT source = (S_OUT) createPipeline(shape, node.spliterator(),
+                                                      StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SIZED | StreamOpFlag.IS_PARALLEL);
                 BiConsumer<R, R> asserter = parallelEqualityAsserter.apply(source);
                 R result = terminalF.apply(source);
                 LambdaTestHelpers.launderAssertion(() -> asserter.accept(refResult, result),
@@ -486,6 +487,16 @@
 
             return refResult;
         }
+
+        AbstractPipeline createPipeline(StreamShape shape, Spliterator s, int flags) {
+            switch (shape) {
+                case REFERENCE:    return new ReferencePipeline(s, flags);
+                case INT_VALUE:    return new IntPipeline(s, flags);
+                case LONG_VALUE:   return new LongPipeline(s, flags);
+                case DOUBLE_VALUE: return new DoublePipeline(s, flags);
+                default: throw new IllegalStateException("Unknown shape: " + shape);
+            }
+        }
     }
 
     public <T, R> R exerciseTerminalOps(Collection<T> data, Function<Stream<T>, R> m, R expected) {
--- a/test-ng/boottests/java/util/stream/UnorderedTest.java	Thu Mar 28 18:32:00 2013 -0700
+++ b/test-ng/boottests/java/util/stream/UnorderedTest.java	Fri Mar 29 21:57:00 2013 +0100
@@ -49,12 +49,40 @@
         testTerminal(data, terminalF, LambdaTestHelpers::assertContentsEqual);
     }
 
+    static class WrappingUnaryOperator<S> implements UnaryOperator<S> {
+
+        final boolean isLimit;
+        final UnaryOperator<S> uo;
+
+        WrappingUnaryOperator(UnaryOperator<S> uo) {
+            this(uo, false);
+        }
+
+        WrappingUnaryOperator(UnaryOperator<S> uo, boolean isLimit) {
+            this.uo = uo;
+            this.isLimit = isLimit;
+        }
+
+        @Override
+        public S apply(S s) {
+            return uo.apply(s);
+        }
+    }
+
+    static <S> WrappingUnaryOperator<S> wrap(UnaryOperator<S> uo) {
+        return new WrappingUnaryOperator<S>(uo);
+    }
+
+    static <S> WrappingUnaryOperator<S> wrap(UnaryOperator<S> uo, boolean isLimit) {
+        return new WrappingUnaryOperator<S>(uo, isLimit);
+    }
+
     @SuppressWarnings("rawtypes")
     private List permutationOfFunctions =
-            LambdaTestHelpers.perm(Arrays.<UnaryOperator<Stream<Object>>>asList(
-                    s -> s.sorted(),
-                    s -> s.distinct(),
-                    s -> s.limit(5)
+            LambdaTestHelpers.perm(Arrays.<WrappingUnaryOperator<Stream<Object>>>asList(
+                    wrap(s -> s.sorted()),
+                    wrap(s -> s.distinct()),
+                    wrap(s -> s.limit(5), true)
             ));
 
     @SuppressWarnings("unchecked")
@@ -78,11 +106,11 @@
         testIntTerminal(data, terminalF, LambdaTestHelpers::assertContentsEqual);
     }
 
-    private List<List<UnaryOperator<IntStream>>> intPermutationOfFunctions =
+    private List<List<WrappingUnaryOperator<IntStream>>> intPermutationOfFunctions =
             LambdaTestHelpers.perm(Arrays.asList(
-                    s -> s.sorted(),
-                    s -> s.distinct(),
-                    s -> s.limit(5)
+                    wrap(s -> s.sorted()),
+                    wrap(s -> s.distinct()),
+                    wrap(s -> s.limit(5), true)
             ));
 
     private <T, R> void testIntTerminal(IntStreamTestData data, Function<IntStream, R> terminalF, BiConsumer<R, R> equalityAsserter) {
@@ -105,11 +133,11 @@
         testLongTerminal(data, terminalF, LambdaTestHelpers::assertContentsEqual);
     }
 
-    private List<List<UnaryOperator<LongStream>>> longPermutationOfFunctions =
+    private List<List<WrappingUnaryOperator<LongStream>>> longPermutationOfFunctions =
             LambdaTestHelpers.perm(Arrays.asList(
-                    s -> s.sorted(),
-                    s -> s.distinct(),
-                    s -> s.limit(5)
+                    wrap(s -> s.sorted()),
+                    wrap(s -> s.distinct()),
+                    wrap(s -> s.limit(5), true)
             ));
 
     private <T, R> void testLongTerminal(LongStreamTestData data, Function<LongStream, R> terminalF, BiConsumer<R, R> equalityAsserter) {
@@ -132,11 +160,11 @@
         testDoubleTerminal(data, terminalF, LambdaTestHelpers::assertContentsEqual);
     }
 
-    private List<List<UnaryOperator<DoubleStream>>> doublePermutationOfFunctions =
+    private List<List<WrappingUnaryOperator<DoubleStream>>> doublePermutationOfFunctions =
             LambdaTestHelpers.perm(Arrays.asList(
-                    s -> s.sorted(),
-                    s -> s.distinct(),
-                    s -> s.limit(5)
+                    wrap(s -> s.sorted()),
+                    wrap(s -> s.distinct()),
+                    wrap(s -> s.limit(5), true)
             ));
 
     private <T, R> void testDoubleTerminal(DoubleStreamTestData data, Function<DoubleStream, R> terminalF, BiConsumer<R, R> equalityAsserter) {
@@ -148,10 +176,10 @@
     private <T, S extends BaseStream<T, S>, R> void testTerminal(TestData<T, S> data,
                                                                  Function<S, R> terminalF,
                                                                  BiConsumer<R, R> equalityAsserter,
-                                                                 List<List<UnaryOperator<S>>> pFunctions,
+                                                                 List<List<WrappingUnaryOperator<S>>> pFunctions,
                                                                  StreamShape shape) {
         CheckClearOrderedOp<T> checkClearOrderedOp = new CheckClearOrderedOp<>(shape);
-        for (List<UnaryOperator<S>> f : pFunctions) {
+        for (List<WrappingUnaryOperator<S>> f : pFunctions) {
             @SuppressWarnings("unchecked")
             UnaryOperator<S> fi = interpose(f, (S s) -> (S) chain(s, checkClearOrderedOp));
             withData(data).
@@ -162,7 +190,7 @@
         }
 
         CheckSetOrderedOp<T> checkSetOrderedOp = new CheckSetOrderedOp<>(shape);
-        for (List<UnaryOperator<S>> f : pFunctions) {
+        for (List<WrappingUnaryOperator<S>> f : pFunctions) {
             @SuppressWarnings("unchecked")
             UnaryOperator<S> fi = interpose(f, (S s) -> (S) chain(s, checkSetOrderedOp));
             withData(data).
@@ -214,15 +242,25 @@
         }
     }
 
-    private <T, S extends BaseStream<T, S>> UnaryOperator<S> interpose(List<UnaryOperator<S>> fs, UnaryOperator<S> fi) {
+    private <T, S extends BaseStream<T, S>> UnaryOperator<S> interpose(List<WrappingUnaryOperator<S>> fs, UnaryOperator<S> fi) {
+        int l = -1;
+        for (int i = 0; i < fs.size(); i++) {
+            if (fs.get(i).isLimit) {
+                l = i;
+            }
+        }
+
+        final int lastLimitIndex = l;
         return s -> {
-            s = fi.apply(s);
-            for (Function<S, S> f : fs) {
-                s = f.apply(s);
+            if (lastLimitIndex == -1)
                 s = fi.apply(s);
+            for (int i = 0; i < fs.size(); i++) {
+                s = fs.get(i).apply(s);
+                if (i >= lastLimitIndex) {
+                    s = fi.apply(s);
+                }
             }
             return s;
         };
     }
-
 }
--- a/test-ng/build.xml	Thu Mar 28 18:32:00 2013 -0700
+++ b/test-ng/build.xml	Fri Mar 29 21:57:00 2013 +0100
@@ -102,7 +102,7 @@
 
    <target name="test" depends="test-compile" >
         <echo>Results at: file:${test.reports.dir}/index.html</echo>
-        <testng classpathref="test.class.path" outputdir="${test.reports.dir}" >
+        <testng classpathref="test.class.path" outputdir="${test.reports.dir}" listener="org.testng.reporters.DotTestListener">
             <classfileset dir="${tests.classes.dir}" includes="**/${test.pattern}.class"/>
             <classfileset dir="${boottests.classes.dir}" includes="**/${test.pattern}.class"/>
             <jvmarg value="-Xbootclasspath/p:${boottests.classes.dir}"/>
@@ -127,7 +127,7 @@
             <jvmarg value="-ea" />
             <jvmarg value="-esa" />
             <jvmarg value="-Xverify:all" />
-            <jvmarg value="-Xmx2500m" />
+            <jvmarg value="-Xmx4000m" />
         </testng>
     </target>
 
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test-ng/tests/org/openjdk/tests/java/util/stream/StreamParSeqTest.java	Fri Mar 29 21:57:00 2013 +0100
@@ -0,0 +1,54 @@
+/*
+ * Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package org.openjdk.tests.java.util.stream;
+
+import org.testng.annotations.Test;
+
+import java.util.Arrays;
+import java.util.stream.Stream;
+
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+@Test
+public class StreamParSeqTest {
+
+    public void testParSeq() {
+        Stream<Integer> s = Arrays.asList(1, 2, 3, 4).stream().parallel();
+        assertTrue(s.isParallel());
+
+        s = s.sequential();
+        assertFalse(s.isParallel());
+
+        s = s.sequential();
+        assertFalse(s.isParallel());
+
+        s = s.parallel();
+        assertTrue(s.isParallel());
+
+        s = s.parallel();
+        assertTrue(s.isParallel());
+    }
+}