changeset 6366:a894d11817f6

- Revert evaluation to use IntermediateOp[] rather than AbstractPipeline[] - Change stream.sequential() to use a stateful collection operation that is a no-op for sequential evaluation. For parallel evaluation the intermediate node that is produced from collection will ensure the rest of the pipeline will be evaluated sequentially. - Expanded the flag op tests to test setting, clearing and preservation of flags sequential and parallel evaluation.
author psandoz
date Fri, 02 Nov 2012 11:36:01 +0100
parents 0fe3dad5e617
children ecf2a2d21af8
files src/share/classes/java/util/streams/AbstractPipeline.java src/share/classes/java/util/streams/StreamOpFlags.java src/share/classes/java/util/streams/ValuePipeline.java src/share/classes/java/util/streams/ops/CollectorOp.java src/share/classes/java/util/streams/ops/CollectorOps.java src/share/classes/java/util/streams/ops/Node.java src/share/classes/java/util/streams/ops/Nodes.java test-ng/tests/org/openjdk/tests/java/util/streams/ops/FlagOpTest.java test-ng/tests/org/openjdk/tests/java/util/streams/ops/LimitOpTest.java test-ng/tests/org/openjdk/tests/java/util/streams/ops/StreamOpTestCase.java test-ng/tests/org/openjdk/tests/java/util/streams/ops/ToArrayOpTest.java
diffstat 11 files changed, 685 insertions(+), 179 deletions(-) [+]
line wrap: on
line diff
--- a/src/share/classes/java/util/streams/AbstractPipeline.java	Fri Nov 02 09:35:49 2012 +0100
+++ b/src/share/classes/java/util/streams/AbstractPipeline.java	Fri Nov 02 11:36:01 2012 +0100
@@ -42,10 +42,41 @@
     protected final StreamAccessor<?> source;
     protected final IntermediateOp<E_IN, E_OUT> op;
     protected final int depth;
-    protected final int combinedOpsFlags;
 
     /**
-     * If non-{@code null} then we are in serial iteration mode.
+     * Evaluation works in two modes:
+     *
+     * 1) Sequential; and
+     * 2) Parallel.
+     *
+     * Sequential evaluation is performed when a stream accessor does not support splitting or when splitting is no
+     * longer possible because some part of the pipeline has already been pulled.
+     *
+     * Parallel evaluation slices up the pipeline into a list of intermediate slices and a terminal slice.
+     * An intermediate slice consists of zero or more stateless intermediate operations and then
+     * one stateful intermediate operation at the end.
+     * The terminal slice consists of zero or more stateless intermediate operations and then
+     * one terminal operation at the end. The terminal operation is responsible for producing a result.
+     *
+     * Each intermediate slice is evaluated, in order, and in parallel to produce an intermediate list of elements
+     * that is held by a Node.
+     * That Node is converted to a StreamAccessor which becomes is the source or input into the next intermediate
+     * or terminal slice.
+     * The process repeats until the terminal slice is processed to produce a result.
+     *
+     * If an intermediate result is a Node that when converted to a stream accessor does not support splitting then all
+     * the remaining operations in the pipeline (the next operation after the stateful operation in the slice and
+     * up to and including the terminal operation) are evaluated sequentially.
+     *
+     * Combined stream and operation flags are calculated for each slice of the pipeline. When a stream accessor for
+     * a Node is created the resultant stream and operation flags produced from the intermediate stateful operation
+     * are utilized to create the stream flags for the stream accessor. This ensures stream properties are
+     * preserved and propagated for the next intermediate evaluation. In addition the SIZED property will be
+     * explicitly set, since the Node knows it's size.
+     */
+
+    /**
+     * If non-{@code null} then we are in sequential iteration mode.
      */
     private Iterator<E_OUT> iterator;
 
@@ -59,7 +90,6 @@
         this.op = null;
         this.upstream = null;
         this.depth = 0;
-        this.combinedOpsFlags = StreamOpFlags.INITIAL_OPS_VALUE;
     }
 
     /**
@@ -73,7 +103,6 @@
         this.op = Objects.requireNonNull(op);
         this.source = upstream.source;
         this.depth = upstream.depth + 1;
-        this.combinedOpsFlags = StreamOpFlags.combineOpFlags(op.getOpFlags(), upstream.combinedOpsFlags);
         assert upstream.getShape() == op.inputShape();
         assert (upstream.depth == 0) ^ (upstream.op != null);
     }
@@ -88,76 +117,100 @@
             return evaluateParallel(terminal);
         }
         else
-            return evaluateSerial(terminal);
+            return evaluateSequential(terminal);
     }
 
     protected<R> R evaluateParallel(TerminalOp<E_OUT, R> terminal) {
-        if (iterator != null) {
+        if (isPipelinePulled()) {
             // If already pulled then cannot split, revert back to sequential evaluation
-            return evaluateSerial(terminal);
+            return evaluateSequential(terminal);
         }
 
-        final AbstractPipeline[] pipes = pipes();
-        // Start from 2nd element since 1st is the head containing the source
-        int fromOp = 1;
-        int upToOp = 1;
+        final IntermediateOp[] ops = ops();
+        // Ops flags length is one greater than op array to hold initial value
+        // Given an op whose index is i then the flags input to that op are at flags[i]
+        // and the flags output from that op are at flags[i + 1]
+        final int[] opsFlags = new int[ops.length + 1];
+        int fromOp = 0;
+        int upToOp = 0;
         StreamAccessor<?> accessor = source;
         while (true) {
-            while (upToOp < pipes.length && !pipes[upToOp].op.isStateful())
+            while (upToOp < ops.length && !ops[upToOp].isStateful())
                 upToOp++;
 
-            if (upToOp < pipes.length) {
-                StatefulOp op = (StatefulOp) pipes[upToOp].op;
-                Node<?> intermediateResult = evaluateParallel(accessor, pipes, fromOp, upToOp, op);
+            if (upToOp < ops.length) {
+                StatefulOp op = (StatefulOp) ops[upToOp];
+                Node<?> intermediateResult = evaluateParallel(accessor, opsFlags, ops, fromOp, upToOp, op);
 
-                // @@@ Inherit other flags from pipeline e.g. the intermediate result may be sorted and/or distinct,
-                //     and is ordered
-                accessor = Nodes.toStreamAccessor(intermediateResult);
+                // Get the combined stream and ops flags for the stateful op
+                int sourceAndOpsFlags = StreamOpFlags.combineStreamFlags(
+                        accessor.getStreamFlags(),
+                        StreamOpFlags.combineOpFlags(op.getOpFlags(), opsFlags[upToOp]));
+                // Get the stream flags for the intermediate stream
+                int streamFlags = StreamOpFlags.flagsToStreamFlags(sourceAndOpsFlags);
+                // Create stream accessor from node using the stream flags
+                accessor = intermediateResult.asStreamAccessor(streamFlags);
 
                 fromOp = ++upToOp;
+
+                if (!accessor.isParallel()) {
+                    return evaluateSequential(accessor, opsFlags, ops, fromOp, ops.length, terminal);
+                }
             }
             else {
-                return evaluateParallel(accessor, pipes, fromOp, upToOp, terminal);
+                return evaluateParallel(accessor, opsFlags, ops, fromOp, upToOp, terminal);
             }
         }
     }
 
+    @SuppressWarnings("unchecked")
     private <R> Node<R> evaluateParallel(StreamAccessor source,
-                                         AbstractPipeline[] pipes, int from, int upTo,
+                                         int[] opsFlags,
+                                         IntermediateOp[] ops, int from, int upTo,
                                          StatefulOp<?, R> terminal) {
-        return (Node<R>) terminal.evaluateParallel(new ParallelImplPipelineHelper(source, pipes, from, upTo));
+        return (Node<R>) terminal.evaluateParallel(new ParallelImplPipelineHelper(source, opsFlags, ops, from, upTo));
     }
 
     private <R> R evaluateParallel(StreamAccessor<?> source,
-                                   AbstractPipeline[] pipes, int from, int upTo,
+                                   int[] opsFlags,
+                                   IntermediateOp[] ops, int from, int upTo,
                                    TerminalOp<E_OUT, R> terminal) {
-        return terminal.evaluateParallel(new ParallelImplPipelineHelper<>(source, pipes, from, upTo));
+        return terminal.evaluateParallel(new ParallelImplPipelineHelper<>(source, opsFlags, ops, from, upTo));
     }
 
-    protected <R> R evaluateSerial(TerminalOp<E_OUT, R> terminal) {
-        return terminal.evaluateSequential(new SequentialImplPipelineHelper<>(source, terminal));
+    @SuppressWarnings("unchecked")
+    protected <R> R evaluateSequential(TerminalOp<E_OUT, R> terminal) {
+        return (R) terminal.evaluateSequential(new SequentialImplPipelineHelperSource());
+    }
+
+    private <R> R evaluateSequential(StreamAccessor<?> source,
+                                     int[] opsFlags,
+                                     IntermediateOp[] ops, int from, int upTo,
+                                     TerminalOp<E_OUT, R> terminal) {
+        return terminal.evaluateSequential(new SequentialImplPipelineHelper<>(source, opsFlags, ops, from, upTo));
     }
 
     static abstract class AbstractPipelineHelper<P_IN, P_OUT> implements PipelineHelper<P_IN, P_OUT> {
         protected final StreamAccessor<P_IN> source;
-        final AbstractPipeline[] pipes;
+        final IntermediateOp[] ops;
+        final int[] opsFlags;
         final int from;
         final int upTo;
         final int sourceAndOpsFlags;
 
-        AbstractPipelineHelper(StreamAccessor<P_IN> source, AbstractPipeline[] pipes) {
-            // Start from 2nd element since 1st is the head containing the source
-            this(source, pipes, 1, pipes.length);
-        }
-
-        AbstractPipelineHelper(StreamAccessor<P_IN> source, AbstractPipeline[] pipes, int from, int upTo) {
+        AbstractPipelineHelper(StreamAccessor<P_IN> source, int[] opsFlags, IntermediateOp[] ops, int from, int upTo) {
             this.source = source;
-            this.pipes = pipes;
+            this.opsFlags = opsFlags;
+            this.ops = ops;
             this.from = from;
             this.upTo = upTo;
-            this.sourceAndOpsFlags = StreamOpFlags.combineStreamFlags(
-                    source.getStreamFlags(),
-                    pipes[upTo - 1].combinedOpsFlags);
+
+            int flags = opsFlags[from] = StreamOpFlags.INITIAL_OPS_VALUE;
+            for (int i = from; i < upTo; i++) {
+                flags = opsFlags[i + 1] = StreamOpFlags.combineOpFlags(ops[i].getOpFlags(), flags);
+            }
+
+            this.sourceAndOpsFlags = StreamOpFlags.combineStreamFlags(source.getStreamFlags(), flags);
         }
 
         protected boolean isOutputSizeKnown() {
@@ -196,8 +249,8 @@
             Objects.requireNonNull(sink);
 
             for (int i = upTo - 1; i >= from; i--) {
-                sink = pipes[i].op.wrapSink(
-                        StreamOpFlags.combineStreamFlags(source.getStreamFlags(), pipes[i - 1].combinedOpsFlags),
+                sink = ops[i].wrapSink(
+                        StreamOpFlags.combineStreamFlags(source.getStreamFlags(), opsFlags[i]),
                         sink);
             }
             return sink;
@@ -208,8 +261,8 @@
             Objects.requireNonNull(it);
 
             for (int i = from; i < upTo; i++) {
-                it = pipes[i].op.wrapIterator(
-                        StreamOpFlags.combineStreamFlags(source.getStreamFlags(), pipes[i - 1].combinedOpsFlags),
+                it = ops[i].wrapIterator(
+                        StreamOpFlags.combineStreamFlags(source.getStreamFlags(), opsFlags[i]),
                         it);
             }
             return it;
@@ -218,53 +271,83 @@
 
     class SequentialImplPipelineHelper<P_IN> extends AbstractPipelineHelper<P_IN, E_OUT> {
 
-        <R> SequentialImplPipelineHelper(StreamAccessor<P_IN> source, TerminalOp<E_OUT, R> terminal) {
-            super(source, pipes());
+        SequentialImplPipelineHelper(StreamAccessor<P_IN> source, IntermediateOp[] ops) {
+            // Start from 2nd element since 1st is the head containing the source
+            super(source, new int[ops.length + 1], ops, 0, ops.length);
+        }
+
+        SequentialImplPipelineHelper(StreamAccessor<P_IN> source, int[] opsFlags, IntermediateOp[] ops, int from, int to) {
+            super(source, opsFlags, ops, from, to);
         }
 
         @Override
         public int getOutputSizeIfKnown() {
-            return (iterator == null && isOutputSizeKnown()) ? source.getSizeIfKnown() : -1;
+            return isOutputSizeKnown() ? source.getSizeIfKnown() : -1;
+        }
+
+        boolean requirePull() {
+            return isShortCircuit();
         }
 
         @Override
         public<S extends Sink<E_OUT>> S into(S sink) {
             Objects.requireNonNull(sink);
 
-            if (isShortCircuit() || iterator != null) {
-                Iterator<E_OUT> it = iterator();
+            if (requirePull()) {
                 sink.begin(-1);
-                while (it.hasNext())
-                    sink.accept(it.next());
+                iterator().forEach(sink);
                 sink.end();
             }  else {
                 super.into(sink);
             }
+
             return sink;
         }
 
         @Override
-        public Iterator<E_OUT> iterator() {
-            return AbstractPipeline.this.iterator();
-        }
-
-        @Override
         public Node<E_OUT> collectOutput() {
             if (hasZeroDepth() && source instanceof Nodes.NodeStreamAccessor) {
                 // If the stream accessor holds a node
                 // @@@ can abstract with StreamAccessor.asArray ?
-                    return ((Nodes.NodeStreamAccessor<E_OUT>) source).asNode();
+                return ((Nodes.NodeStreamAccessor<E_OUT>) source).asNode();
             }
             else
                 return into(Nodes.<E_OUT>makeBuilder(getOutputSizeIfKnown())).build();
         }
+
+        @Override
+        public Iterator<E_OUT> iterator() {
+            return wrapIterator(source.iterator());
+        }
+    }
+
+    class SequentialImplPipelineHelperSource extends SequentialImplPipelineHelper {
+
+        <R> SequentialImplPipelineHelperSource() {
+            super(AbstractPipeline.this.source, ops());
+        }
+
+        @Override
+        boolean requirePull() {
+            return isPipelinePulled() ? true : super.requirePull();
+        }
+
+        @Override
+        public int getOutputSizeIfKnown() {
+            return isPipelinePulled() ? -1 : super.getOutputSizeIfKnown();
+        }
+
+        @Override
+        public Iterator<E_OUT> iterator() {
+            return AbstractPipeline.this.iterator();
+        }
     }
 
     class ParallelImplPipelineHelper<P_IN> extends AbstractPipelineHelper<P_IN, E_OUT> implements ParallelPipelineHelper<P_IN, E_OUT> {
         final long targetSize;
 
-        ParallelImplPipelineHelper(StreamAccessor<P_IN> source, AbstractPipeline[] pipes, int from, int to) {
-            super(source, pipes, from, to);
+        ParallelImplPipelineHelper(StreamAccessor<P_IN> source, int[] opsFlags, IntermediateOp[] ops, int from, int to) {
+            super(source, opsFlags, ops, from, to);
             this.targetSize = calculateTargetSize();
         }
 
@@ -310,25 +393,56 @@
 
     }
 
-    // Return the array of pipes, including the head that represents the source
-    private AbstractPipeline[] pipes() {
-        AbstractPipeline[] pipes = new AbstractPipeline[depth + 1];
+    private IntermediateOp[] ops() {
+        IntermediateOp[] ops = new IntermediateOp[depth];
         AbstractPipeline p = this;
-        for (int i = depth; i >= 0; i--) {
-            pipes[i] = p;
+        for (int i = depth - 1; i >= 0; i--) {
+            ops[i] = p.op;
             p = p.upstream;
         }
-        return pipes;
+        return ops;
+    }
+
+    private boolean isPipelinePulled() {
+        if (iterator != null)
+            return true;
+
+        AbstractPipeline p = this.upstream;
+        while (p != null) {
+            if (p.iterator != null) {
+                // @@@ Should iterator() be called to aggressively update this and all upstream non-null iterators?
+                return true;
+            }
+            p = p.upstream;
+        }
+
+        return false;
     }
 
     @SuppressWarnings("unchecked")
     public Iterator<E_OUT> iterator() {
         if (iterator == null) {
-            iterator = (Iterator) ((op == null)
-                                   ? source.iterator()
-                                   : op.wrapIterator(StreamOpFlags.combineStreamFlags(source.getStreamFlags(),
-                                                                                      upstream.combinedOpsFlags),
-                                                     upstream.iterator()));
+            AbstractPipeline[] pipes = new AbstractPipeline[depth + 1];
+            AbstractPipeline p = this;
+            for (int i = depth; i >= 0; i--) {
+                pipes[i] = p;
+                p = p.upstream;
+            }
+
+            // Check if this is the first pull and if so get the source iterator
+            if (pipes[0].iterator == null) {
+                pipes[0].iterator = source.iterator();
+            }
+
+            int opsFlags = StreamOpFlags.INITIAL_OPS_VALUE;
+            for (int i = 1; i <= depth; i++) {
+                p = pipes[i];
+                if (p.iterator == null) {
+                    p.iterator = p.op.wrapIterator(StreamOpFlags.combineStreamFlags(source.getStreamFlags(), opsFlags),
+                                                   pipes[i - 1].iterator);
+                }
+                opsFlags = StreamOpFlags.combineOpFlags(pipes[i].op.getOpFlags(), opsFlags);
+            }
         }
         return iterator;
     }
--- a/src/share/classes/java/util/streams/StreamOpFlags.java	Fri Nov 02 09:35:49 2012 +0100
+++ b/src/share/classes/java/util/streams/StreamOpFlags.java	Fri Nov 02 11:36:01 2012 +0100
@@ -24,6 +24,8 @@
  */
 package java.util.streams;
 
+import java.util.EnumSet;
+
 /**
  * Flags for known and not known properties of streams and operations.
  *
@@ -88,6 +90,30 @@
     }
 
     /**
+     *
+     * @return the set flag.
+     */
+    public int set() {
+        return set;
+    }
+
+    /**
+     *
+     * @return the clear flag.
+     */
+    public int clear() {
+        return clear;
+    }
+
+    /**
+     *
+     * @return true if a stream-based flag, otherwise false.
+     */
+    public boolean isStreamFlag() {
+        return isStreamFlag;
+    }
+
+    /**
      * Check if a property is known on the combined stream and operations flags.
      *
      * @param flags the combined stream and operations flags.
@@ -98,6 +124,16 @@
     }
 
     /**
+     * Check if a property is preserved on the combined and operation flags.
+     *
+     * @param flags the combined stream and operations flags.
+     * @return true if the property is preserved, otherwise false.
+     */
+    public boolean isPreserve(int flags) {
+        return (flags & preserve) == preserve;
+    }
+
+    /**
      * Check if a property is known on the combined operations flags.
      *
      * @param opFlags the combined operations flags.
@@ -228,10 +264,35 @@
      * @return the result of combination.
      */
     public static int combineStreamFlags(int streamFlags, int opsFlags) {
-        // 0x01 nibbles are transformed to 0x11
-        // Then all the bits are flipped
+        // For stream flags all 0x00 nibbles are transformed to 0x11
         // Then the result is logically and'ed with the combined operation flags
         // 0x01 nibbles correspond to set flags
         return (FLAG_MASK ^ ((FLAG_MASK_IS & streamFlags) << 1)) & opsFlags;
     }
+
+    /**
+     * Obtain flags that can be set on a stream given combined stream and operation flags.
+     *
+     * @param flags the combined stream and operation flags.
+     * @return flags for a stream, all known properties are propagated and all not-known properties are ignored.
+     */
+    public static int flagsToStreamFlags(int flags) {
+        // By flipping the nibbles 0x11 become 0x00 and 0x01 become 0x10
+        // Shift left 1 to restore set flags and mask off anything other than the set flags
+        return ((~flags) >> 1) & FLAG_MASK_IS & flags;
+    }
+
+    /**
+     * Creates an enum set containing all stream-based flags.
+     *
+     */
+    public static EnumSet<StreamOpFlags> allOfStreamFlags() {
+        EnumSet<StreamOpFlags> flags = EnumSet.allOf(StreamOpFlags.class);
+        for (StreamOpFlags f : flags) {
+            if (!f.isStreamFlag()) {
+                flags.remove(f);
+            }
+        }
+        return flags;
+    }
 }
--- a/src/share/classes/java/util/streams/ValuePipeline.java	Fri Nov 02 09:35:49 2012 +0100
+++ b/src/share/classes/java/util/streams/ValuePipeline.java	Fri Nov 02 11:36:01 2012 +0100
@@ -160,13 +160,7 @@
 
     @Override
     public Stream<U> sequential() {
-        if (!isParallel()) {
-            return this;
-        }
-        else {
-            Node<U> collected = evaluate(CollectorOp.<U>singleton());
-            return Streams.stream(collected, collected.size());
-        }
+        return chainValue(CollectorOps.<U>sequentialCollector());
     }
 
     @Override
--- a/src/share/classes/java/util/streams/ops/CollectorOp.java	Fri Nov 02 09:35:49 2012 +0100
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,50 +0,0 @@
-/*
- * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.  Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-package java.util.streams.ops;
-
-import java.util.streams.ParallelPipelineHelper;
-import java.util.streams.PipelineHelper;
-
-public class CollectorOp<E_IN> implements TerminalOp<E_IN, Node<E_IN>> {
-
-    private static CollectorOp<?> INSTANCE = new CollectorOp<>();
-
-    @SuppressWarnings("unchecked")
-    public static <E_IN> CollectorOp<E_IN> singleton() {
-        return (CollectorOp<E_IN>) INSTANCE;
-    }
-
-    private CollectorOp() { }
-
-    @Override
-    public <P_IN> Node<E_IN> evaluateSequential(PipelineHelper<P_IN, E_IN> helper) {
-        return helper.into(Nodes.<E_IN>makeBuilder(helper.getOutputSizeIfKnown())).build();
-    }
-
-    @Override
-    public <P_IN> Node<E_IN> evaluateParallel(ParallelPipelineHelper<P_IN, E_IN> helper) {
-        return TreeUtils.collect(helper, false);
-    }
-}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/share/classes/java/util/streams/ops/CollectorOps.java	Fri Nov 02 11:36:01 2012 +0100
@@ -0,0 +1,128 @@
+/*
+ * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package java.util.streams.ops;
+
+import java.util.Iterator;
+import java.util.streams.ParallelPipelineHelper;
+import java.util.streams.PipelineHelper;
+import java.util.streams.Sink;
+
+public final class CollectorOps {
+    private CollectorOps() { }
+
+    private static Sequential<?> SEQUENTIAL_COLLECTOR_OP = new Sequential<>();
+
+    @SuppressWarnings("unchecked")
+    public static <E_IN> Sequential<E_IN> sequentialCollector() {
+        return (Sequential<E_IN>) SEQUENTIAL_COLLECTOR_OP;
+    }
+
+    private static Parallel<?> PARALLEL_COLLECTOR_OP = new Parallel<>();
+
+    @SuppressWarnings("unchecked")
+    public static <E_IN> Parallel<E_IN> parallelCollector() {
+        return (Parallel<E_IN>) PARALLEL_COLLECTOR_OP;
+    }
+
+    private static Terminal<?> TERMINAL_COLLECTOR_OP = new Terminal<>();
+
+    @SuppressWarnings("unchecked")
+    public static <E_IN> Terminal<E_IN> terminalCollector() {
+        return (Terminal<E_IN>) TERMINAL_COLLECTOR_OP;
+    }
+
+    /**
+     * Collect elements into a Node, if evaluated in parallel, and force sequential evaluation on upstream operations,
+     * otherwise, if evaluated sequentially, this operation is a no-op.
+     */
+    public static final class Sequential<E_IN> implements StatefulOp<E_IN, E_IN> {
+
+        private Sequential() { }
+
+        @Override
+        public Iterator<E_IN> wrapIterator(int flags, Iterator<E_IN> in) {
+            return in;
+        }
+
+        @Override
+        public Sink<E_IN> wrapSink(int flags, Sink<E_IN> sink) {
+            return sink;
+        }
+
+        @Override
+        public <P_IN> Node<E_IN> evaluateSequential(PipelineHelper<P_IN, E_IN> helper) {
+            return helper.collectOutput();
+        }
+
+        @Override
+        public <P_IN> Node<E_IN> evaluateParallel(ParallelPipelineHelper<P_IN, E_IN> helper) {
+            // Force sequential evaluation on upstream operations
+            return Nodes.withSequentialStreamAccessor(helper.collectOutput());
+        }
+    }
+
+    public static final class Parallel<E_IN> implements StatefulOp<E_IN, E_IN> {
+
+        private Parallel() { }
+
+        @Override
+        public Iterator<E_IN> wrapIterator(int flags, Iterator<E_IN> in) {
+            return in;
+        }
+
+        @Override
+        public Sink<E_IN> wrapSink(int flags, Sink<E_IN> sink) {
+            return sink;
+        }
+
+        @Override
+        public <P_IN> Node<E_IN> evaluateSequential(PipelineHelper<P_IN, E_IN> helper) {
+            return helper.collectOutput();
+        }
+
+        @Override
+        public <P_IN> Node<E_IN> evaluateParallel(ParallelPipelineHelper<P_IN, E_IN> helper) {
+            return helper.collectOutput();
+        }
+    }
+
+    /**
+     * Collect elements into a Node that is the result of terminal evaluation.
+     */
+    public static final class Terminal<E_IN> implements TerminalOp<E_IN, Node<E_IN>> {
+
+        private Terminal() { }
+
+        @Override
+        public <P_IN> Node<E_IN> evaluateParallel(ParallelPipelineHelper<P_IN, E_IN> helper) {
+            return helper.collectOutput();
+        }
+
+        @Override
+        public <P_IN> Node<E_IN> evaluateSequential(PipelineHelper<P_IN, E_IN> helper) {
+            return helper.collectOutput();
+        }
+    }
+}
--- a/src/share/classes/java/util/streams/ops/Node.java	Fri Nov 02 09:35:49 2012 +0100
+++ b/src/share/classes/java/util/streams/ops/Node.java	Fri Nov 02 11:36:01 2012 +0100
@@ -26,6 +26,7 @@
 
 import java.util.*;
 import java.util.streams.Spliterator;
+import java.util.streams.StreamAccessor;
 
 public interface Node<T> extends Iterable<T>, Sized {
 
@@ -81,4 +82,13 @@
      */
     void copyInto(T[] array, int offset) throws IndexOutOfBoundsException;
 
+    /**
+     * View this node as a stream accessor.
+     *
+     * @param flags the stream accessor flags.
+     * @return the stream accessor.
+     */
+    StreamAccessor<T> asStreamAccessor(int flags) default {
+        return Nodes.toStreamAccessor(this, flags);
+    }
 }
--- a/src/share/classes/java/util/streams/ops/Nodes.java	Fri Nov 02 09:35:49 2012 +0100
+++ b/src/share/classes/java/util/streams/ops/Nodes.java	Fri Nov 02 11:36:01 2012 +0100
@@ -34,8 +34,83 @@
         Node<T> asNode();
     }
 
-    public static <T> NodeStreamAccessor<T> toStreamAccessor(Node<T> node) {
-        return new NodeStreamAccessorImpl<>(node);
+    /**
+     * Adapt the node to be a node with a sequential (non-parallel) stream accessor.
+     *
+     * @param node the node to adapt.
+     * @return a node with a sequential stream accessor.
+     */
+    public static <T> Node<T> withSequentialStreamAccessor(Node<T> node) {
+        return (node instanceof NodeSequentialStreamAccessor) ? node : new NodeSequentialStreamAccessor<>(node);
+    }
+
+    private static class NodeSequentialStreamAccessor<T> implements Node<T> {
+        private final Node<T> node;
+
+        public NodeSequentialStreamAccessor(Node<T> node) {this.node = node;}
+
+        @Override
+        public Spliterator<T> spliterator() {
+            return node.spliterator();
+        }
+
+        @Override
+        public int getChildCount() {
+            return node.getChildCount();
+        }
+
+        @Override
+        public Iterator<Node<T>> children() {
+            return node.children();
+        }
+
+        @Override
+        public Node flatten() {
+            return node.flatten();
+        }
+
+        @Override
+        public T[] asArray() {
+            return node.asArray();
+        }
+
+        @Override
+        public void copyInto(T[] array, int offset) throws IndexOutOfBoundsException {
+            node.copyInto(array, offset);
+        }
+
+        @Override
+        public StreamAccessor<T> asStreamAccessor(int flags) {
+            return toStreamAccessor(node, flags, false);
+        }
+
+        @Override
+        public Iterator<T> iterator() {
+            return node.iterator();
+        }
+
+        @Override
+        public int size() {
+            return node.size();
+        }
+
+        @Override
+        public boolean isEmpty() {
+            return node.isEmpty();
+        }
+
+        @Override
+        public void forEach(Block<? super T> block) {
+            node.forEach(block);
+        }
+    }
+
+    static <T> NodeStreamAccessor<T> toStreamAccessor(Node<T> node, int flags) {
+        return new NodeStreamAccessorImpl<>(node, flags);
+    }
+
+    static <T> NodeStreamAccessor<T> toStreamAccessor(Node<T> node, int flags, boolean isParallel) {
+        return new NodeStreamAccessorImpl<>(node, flags, isParallel);
     }
 
     // @@@ This is a copy of Streams.SpliteratorStreamAccessor
@@ -44,12 +119,19 @@
         private final Spliterator<T> spliterator;
         private final int sizeOrUnknown;
         private final int flags;
+        private final boolean isParallel;
 
-        public NodeStreamAccessorImpl(Node<T> node) {
+        NodeStreamAccessorImpl(Node<T> node, int flags) {
+            this(node, flags, true);
+        }
+
+        NodeStreamAccessorImpl(Node<T> node, int flags, boolean isParallel) {
             this.node = node;
             this.spliterator = node.spliterator();
             this.sizeOrUnknown = node.size();
-            this.flags = StreamOpFlags.IS_ORDERED;
+            // @@@ Injecting order may depend on if order has been explicitly cleared upstream
+            this.flags = StreamOpFlags.IS_ORDERED | StreamOpFlags.IS_SIZED | flags;
+            this.isParallel = isParallel;
         }
 
         @Override
@@ -86,7 +168,7 @@
 
         @Override
         public boolean isParallel() {
-            return true;
+            return isParallel;
         }
     }
 
@@ -244,7 +326,7 @@
     public static<T> Node<T> node(Stream<? extends T> stream) {
         if (stream instanceof AbstractPipeline) {
             // @@@ Yuk! the cast sucks, but it avoids uncessary wrapping
-            return ((AbstractPipeline<T, T>) stream).pipeline(CollectorOp.<T>singleton());
+            return ((AbstractPipeline<T, T>) stream).pipeline(CollectorOps.<T>terminalCollector());
         }
         else {
             // @@@ This path can occur if there are other implementations of Stream
@@ -506,7 +588,7 @@
         @Override
         public void begin(int size) {
             if (size != array.length) {
-                throw new IllegalStateException(String.format("Begin size %d is not equal to fixed size %s", size, array.length));
+                throw new IllegalStateException(String.format("Begin size %d is not equal to fixed size %d", size, array.length));
             }
 
             curSize = 0;
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/FlagOpTest.java	Fri Nov 02 09:35:49 2012 +0100
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/FlagOpTest.java	Fri Nov 02 11:36:01 2012 +0100
@@ -27,13 +27,13 @@
 import org.openjdk.tests.java.util.streams.StreamTestDataProvider;
 import org.testng.annotations.Test;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
+import java.util.*;
+import java.util.functions.Factory;
+import java.util.streams.ParallelPipelineHelper;
+import java.util.streams.PipelineHelper;
 import java.util.streams.Sink;
 import java.util.streams.StreamOpFlags;
-import java.util.streams.ops.FlagDeclaringOp;
+import java.util.streams.ops.*;
 
 import static org.openjdk.tests.java.util.LambdaTestHelpers.countTo;
 
@@ -53,7 +53,6 @@
         ops[1].set(ops[0], ops[2]);
         ops[2].set(ops[1], null);
 
-
         exerciseOps(data, ops);
     }
 
@@ -96,66 +95,181 @@
         }
     }
 
-    public void testFlagsAssigned() {
-        TestData<Integer> data = new CollectionTestData<>("List", countTo(10));
-        // @@@ Not possible to get direct acccess to the flags on stream accessor
-        // @@@ Expands stream flag test data
-        int streamFlags = StreamOpFlags.IS_ORDERED | StreamOpFlags.IS_SIZED;
-
-        // @@@ Expand the op flag test data
-        List<Integer> opFlags = Arrays.asList(
-                StreamOpFlags.NOT_DISTINCT,
-                StreamOpFlags.NOT_ORDERED,
-                StreamOpFlags.NOT_SIZED,
-                StreamOpFlags.NOT_SORTED);
-
-        // Calculate combined ops flags
-        List<Integer> opsFlags = new ArrayList<>();
-        opsFlags.add(StreamOpFlags.INITIAL_OPS_VALUE);
-        for (int i = 0; i < opFlags.size(); i++) {
-            opsFlags.add(StreamOpFlags.combineOpFlags(opFlags.get(i), opsFlags.get(i)));
+    public void testFlagsClearAllSet() {
+        int clearAllFlags = 0;
+        for (StreamOpFlags f : EnumSet.allOf(StreamOpFlags.class)) {
+            if (f.isStreamFlag()) {
+                clearAllFlags |= f.clear();
+            }
         }
 
-        // Calculate combined stream and ops flags
-        List<Integer> flags = new ArrayList<>();
-        for (int i : opsFlags) {
-            flags.add(StreamOpFlags.combineStreamFlags(streamFlags, i));
+        EnumSet<StreamOpFlags> known = EnumSet.noneOf(StreamOpFlags.class);
+        EnumSet<StreamOpFlags> notKnown = StreamOpFlags.allOfStreamFlags();
+
+        List<FlagDeclaringOp<Integer>> ops = new ArrayList<>();
+        ops.add(new FlagDeclaringOp<>(clearAllFlags));
+        for (StreamOpFlags f : StreamOpFlags.allOfStreamFlags()) {
+            ops.add(new TestFlagExpectedOp<>(f.set(),
+                                             known.clone(),
+                                             EnumSet.noneOf(StreamOpFlags.class),
+                                             notKnown.clone()));
+            known.add(f);
+            notKnown.remove(f);
+        }
+        ops.add(new TestFlagExpectedOp<>(0,
+                                         known.clone(),
+                                         EnumSet.noneOf(StreamOpFlags.class),
+                                         notKnown.clone()));
+
+        TestData<Integer> data = new CollectionTestData<>("List", countTo(10));
+        @SuppressWarnings("rawtypes")
+        FlagDeclaringOp[] opsArray = ops.toArray(new FlagDeclaringOp[ops.size()]);
+
+        testUsingData(data).without(IntermediateOpTest.PAR_STREAM_TO_ARRAY_CLEAR_SIZED).excerciseOps(opsArray);
+//        exerciseOps(data, opsArray);
+    }
+
+    public void testFlagsSetAllClear() {
+        int setAllFlags = 0;
+        for (StreamOpFlags f : EnumSet.allOf(StreamOpFlags.class)) {
+            if (f.isStreamFlag()) {
+                setAllFlags |= f.set();
+            }
         }
 
-        // Create operations
+        EnumSet<StreamOpFlags> known = StreamOpFlags.allOfStreamFlags();
+        EnumSet<StreamOpFlags> notKnown = EnumSet.noneOf(StreamOpFlags.class);
+
         List<FlagDeclaringOp<Integer>> ops = new ArrayList<>();
-        for (int i = 0; i < opFlags.size(); i++) {
-            ops.add(new TestFlagExpectedOp<>(flags.get(i), opFlags.get(i)));
+        ops.add(new FlagDeclaringOp<>(setAllFlags));
+        for (StreamOpFlags f : StreamOpFlags.allOfStreamFlags()) {
+            ops.add(new TestFlagExpectedOp<>(f.clear(),
+                                             known.clone(),
+                                             EnumSet.noneOf(StreamOpFlags.class),
+                                             notKnown.clone()));
+            known.remove(f);
+            notKnown.add(f);
         }
-        ops.add(new TestFlagExpectedOp<>(flags.get(opFlags.size()), 0));
+        ops.add(new TestFlagExpectedOp<>(0,
+                                         known.clone(),
+                                         EnumSet.noneOf(StreamOpFlags.class),
+                                         notKnown.clone()));
 
-        exerciseOps(data, ops.toArray(createTestFlagExpectedOpArray(ops.size())));
+        TestData<Integer> data = new CollectionTestData<>("List", countTo(10));
+        @SuppressWarnings("rawtypes")
+        FlagDeclaringOp[] opsArray = ops.toArray(new FlagDeclaringOp[ops.size()]);
+
+        testUsingData(data).without(IntermediateOpTest.PAR_STREAM_TO_ARRAY_CLEAR_SIZED).excerciseOps(opsArray);
     }
 
-    @SuppressWarnings({"unchecked", "rawtypes"})
-    static TestFlagExpectedOp<Integer>[] createTestFlagExpectedOpArray(int size) {
-        return new TestFlagExpectedOp[size];
+    public void testFlagsSetParallelCollect() {
+        testFlagsSetSequence(CollectorOps::parallelCollector);
     }
+
+    public void testFlagsSetSequentialCollect() {
+        testFlagsSetSequence(CollectorOps::sequentialCollector);
+    }
+
+    private void testFlagsSetSequence(Factory<StatefulOp<Integer, Integer>> cf) {
+        EnumSet<StreamOpFlags> known = EnumSet.of(StreamOpFlags.ORDERED, StreamOpFlags.SIZED);
+        EnumSet<StreamOpFlags> preserve = EnumSet.of(StreamOpFlags.DISTINCT, StreamOpFlags.SORTED);
+
+        List<IntermediateOp<Integer, Integer>> ops = new ArrayList<>();
+        for (StreamOpFlags f : EnumSet.of(StreamOpFlags.DISTINCT, StreamOpFlags.SORTED)) {
+            ops.add(cf.make());
+            ops.add(new TestFlagExpectedOp<>(f.set(),
+                                             known.clone(),
+                                             preserve.clone(),
+                                             EnumSet.noneOf(StreamOpFlags.class)));
+            known.add(f);
+            preserve.remove(f);
+        }
+        ops.add(cf.make());
+        ops.add(new TestFlagExpectedOp<>(0,
+                                         known.clone(),
+                                         preserve.clone(),
+                                         EnumSet.noneOf(StreamOpFlags.class)));
+
+        TestData<Integer> data = new CollectionTestData<>("List", countTo(10));
+        @SuppressWarnings("rawtypes")
+        IntermediateOp[] opsArray = ops.toArray(new IntermediateOp[ops.size()]);
+
+        testUsingData(data).without(IntermediateOpTest.PAR_STREAM_TO_ARRAY_CLEAR_SIZED).excerciseOps(opsArray);
+    }
+
+
+    public void testFlagsClearParallelCollect() {
+        testFlagsClearSequence(CollectorOps::parallelCollector);
+    }
+
+    public void testFlagsClearSequentialCollect() {
+        testFlagsClearSequence(CollectorOps::sequentialCollector);
+    }
+
+    protected void testFlagsClearSequence(Factory<StatefulOp<Integer, Integer>> cf) {
+        EnumSet<StreamOpFlags> preserve = EnumSet.of(StreamOpFlags.DISTINCT, StreamOpFlags.SORTED);
+        EnumSet<StreamOpFlags> notKnown = EnumSet.noneOf(StreamOpFlags.class);
+
+        List<IntermediateOp<Integer, Integer>> ops = new ArrayList<>();
+        for (StreamOpFlags f : EnumSet.of(StreamOpFlags.DISTINCT, StreamOpFlags.SORTED)) {
+            ops.add(cf.make());
+            ops.add(new TestFlagExpectedOp<>(f.clear(),
+                                             EnumSet.of(StreamOpFlags.ORDERED, StreamOpFlags.SIZED),
+                                             preserve.clone(),
+                                             notKnown));
+            notKnown.add(f);
+            preserve.remove(f);
+        }
+        ops.add(cf.make());
+        ops.add(new TestFlagExpectedOp<>(0,
+                                         EnumSet.of(StreamOpFlags.ORDERED, StreamOpFlags.SIZED),
+                                         preserve.clone(),
+                                         EnumSet.noneOf(StreamOpFlags.class)));
+
+        TestData<Integer> data = new CollectionTestData<>("List", countTo(10));
+        @SuppressWarnings("rawtypes")
+        IntermediateOp[] opsArray = ops.toArray(new IntermediateOp[ops.size()]);
+
+        testUsingData(data).without(IntermediateOpTest.PAR_STREAM_TO_ARRAY_CLEAR_SIZED).excerciseOps(opsArray);
+    }
+
     static class TestFlagExpectedOp<T> extends FlagDeclaringOp<T> {
-        final int downstreamFlags;
+        final EnumSet<StreamOpFlags> known;
+        final EnumSet<StreamOpFlags> preserve;
+        final EnumSet<StreamOpFlags> notKnown;
 
-        TestFlagExpectedOp(int downstreamFlags, int flags) {
+        TestFlagExpectedOp(int flags, EnumSet<StreamOpFlags> known, EnumSet<StreamOpFlags> preserve, EnumSet<StreamOpFlags> notKnown) {
             super(flags);
-            this.downstreamFlags = downstreamFlags;
+            this.known = known;
+            this.preserve = preserve;
+            this.notKnown = notKnown;
         }
 
         @Override
         public Iterator<T> wrapIterator(int flags, Iterator<T> downstream) {
-            assertEquals(downstreamFlags, flags);
+            assertFlags(flags);
             return downstream;
         }
 
         @Override
         @SuppressWarnings({"unchecked", "rawtypes"})
         public Sink<T> wrapSink(int flags, Sink upstream) {
-            assertEquals(downstreamFlags, flags);
+            assertFlags(flags);
             return upstream;
         }
+
+        private void assertFlags(int flags) {
+            for (StreamOpFlags f : known) {
+                assertTrue(f.isKnown(flags), String.format("Flag %s is not known, but should be known.", f.toString()));
+            }
+
+            for (StreamOpFlags f : preserve) {
+                assertTrue(f.isPreserve(flags), String.format("Flag %s is not preserved, but should be preserved.", f.toString()));
+            }
+
+            for (StreamOpFlags f : notKnown) {
+                assertFalse(f.isKnown(flags), String.format("Flag %s is known, but should be not known.", f.toString()));
+            }
+        }
     }
-
 }
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/LimitOpTest.java	Fri Nov 02 09:35:49 2012 +0100
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/LimitOpTest.java	Fri Nov 02 11:36:01 2012 +0100
@@ -27,6 +27,7 @@
 import org.openjdk.tests.java.util.streams.StreamTestDataProvider;
 import org.testng.annotations.Test;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -101,4 +102,10 @@
             return Arrays.asList(0, 1, size / 2, size - 1, size, size + 1, 2 * size);
         }
     }
+
+    public void testSequentialShortCircuit() {
+        List<Integer> l = countTo(10).parallel().sequential().limit(5).into(new ArrayList<Integer>());
+        assertEquals(l.size(), 5);
+        assertEquals(l.get(l.size() -1).intValue(), 5);
+    }
 }
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/StreamOpTestCase.java	Fri Nov 02 09:35:49 2012 +0100
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/StreamOpTestCase.java	Fri Nov 02 11:36:01 2012 +0100
@@ -33,6 +33,7 @@
 import java.util.functions.Block;
 import java.util.functions.Factory;
 import java.util.streams.*;
+import java.util.streams.ops.FlagDeclaringOp;
 import java.util.streams.ops.IntermediateOp;
 import java.util.streams.ops.TerminalOp;
 
@@ -83,6 +84,19 @@
             }
         },
 
+        // Wrap as two connected streams, request iterator for the upstream, and do forEach on the downstream
+        STREAM_MIXED_ITERATOR_FOR_EACH(false) {
+            <T> void run(TestData<T> data, Sink sink, IntermediateOp[] ops) {
+                AbstractPipeline<?, ?> pipe1 = data.seq(new FlagDeclaringOp(0));
+                AbstractPipeline<?, ?> pipe2 = chain(pipe1, ops);
+
+                Stream<?> stream1 = stream(pipe1);
+                Stream<?> stream2 = stream(pipe2);
+                stream1.iterator();
+                stream2.forEach(sink);
+            }
+        },
+
         // Wrap as parallel stream + sequential
         PAR_STREAM_SEQUENTIAL_FOR_EACH(true) {
             <T> void run(TestData<T> data, Sink sink, IntermediateOp[] ops) {
@@ -98,6 +112,17 @@
             }
         },
 
+        // Wrap as parallel stream + toArray and clear SIZED flag
+        PAR_STREAM_TO_ARRAY_CLEAR_SIZED(true) {
+            <T> void run(TestData<T> data, Sink sink, IntermediateOp[] ops) {
+                AbstractPipeline<?, ?> pipe1 = data.seq(new FlagDeclaringOp(StreamOpFlags.NOT_SIZED));
+                AbstractPipeline<?, ?> pipe2 = chain(pipe1, ops);
+
+                for (Object t : stream(pipe2).toArray())
+                    sink.accept(t);
+            }
+        },
+
         // Wrap as parallel stream + into
         PAR_STREAM_SEQUENTIAL_INTO(true) {
             <T> void run(TestData<T> data, Sink sink, IntermediateOp[] ops) {
@@ -119,6 +144,20 @@
             }
         },
 
+        // Wrap as two connected streams, request iterator for the upstream, and do forEach on the downstream
+        PAR_STREAM_MIXED_ITERATOR_TO_ARRAY(true) {
+            <T> void run(TestData<T> data, Sink sink, IntermediateOp[] ops) {
+                AbstractPipeline<?, ?> pipe1 = data.par(new FlagDeclaringOp(0));
+                AbstractPipeline<?, ?> pipe2 = chain(pipe1, ops);
+
+                Stream<?> stream1 = stream(pipe1);
+                Stream<?> stream2 = stream(pipe2);
+                stream1.iterator();
+                for (Object t : stream2.toArray())
+                    sink.accept(t);
+            }
+        },
+
         // Wrap as parallel stream, and iterate in mixed mode
         PAR_STREAM_SEQUENTIAL_MIXED(true) {
             <T> void run(TestData<T> data, Sink sink, IntermediateOp[] ops) {
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/ToArrayOpTest.java	Fri Nov 02 09:35:49 2012 +0100
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/ToArrayOpTest.java	Fri Nov 02 11:36:01 2012 +0100
@@ -100,19 +100,19 @@
 
         {
             Node<Integer> node = Nodes.node(l);
-            Object[] output = new ValuePipeline<>(Nodes.toStreamAccessor(node)).toArray();
+            Object[] output = new ValuePipeline<>(node.asStreamAccessor(0)).toArray();
             assertEquals(Arrays.asList(output), l);
         }
 
         {
             Node<Integer> node = Nodes.node(l.toArray(new Integer[l.size()]));
-            Object[] output = new ValuePipeline<>(Nodes.toStreamAccessor(node)).toArray();
+            Object[] output = new ValuePipeline<>(node.asStreamAccessor(0)).toArray();
             assertEquals(Arrays.asList(output), l);
         }
 
         {
             Node<Integer> node = tree(l);
-            Object[] output = new ValuePipeline<>(Nodes.toStreamAccessor(node)).toArray();
+            Object[] output = new ValuePipeline<>(node.asStreamAccessor(0)).toArray();
             assertEquals(Arrays.asList(output), l);
         }
 
@@ -121,7 +121,7 @@
             for (Integer i : l) {
                 nodeBuilder.accept(i);
             }
-            Object[] output = new ValuePipeline<>(Nodes.toStreamAccessor(nodeBuilder.build())).toArray();
+            Object[] output = new ValuePipeline<>(nodeBuilder.build().asStreamAccessor(0)).toArray();
             assertEquals(Arrays.asList(output), l);
         }
 
@@ -132,9 +132,16 @@
                 nodeBuilder.accept(i);
             }
             nodeBuilder.end();
-            Object[] output = new ValuePipeline<>(Nodes.toStreamAccessor(nodeBuilder.build())).toArray();
+            Object[] output = new ValuePipeline<>(nodeBuilder.build().asStreamAccessor(0)).toArray();
             assertEquals(Arrays.asList(output), l);
         }
+
+        {
+            Node<Integer> node = Nodes.node(l);
+            Object[] output = new ValuePipeline<>(node.asStreamAccessor(0)).sequential().toArray();
+            assertEquals(Arrays.asList(output), l);
+        }
+
     }
 
     Node<Integer> tree(List<Integer> l) {