changeset 6367:ecf2a2d21af8

- removed StreamAccessor - AbstractPipeline now operates on a Spliterator and flags - the StreamOfFlags.PARALLEL defines whether sequential or parallel evaluation is performed - Simplify static methods on Streams to create a Stream for Iterable Iterator, Spliterator and arrays. (ArrayProxy support is commented out and needs to be revisisted) - Fixed issues with Spliterator of SpinedList returning incorrect known size - Increase test coverage for Node and NodeBuilder instances.
author psandoz
date Sat, 03 Nov 2012 20:12:39 +0100
parents a894d11817f6
children cb5a68c2e6cf
files src/share/classes/com/sun/tools/jdi/EventSetImpl.java src/share/classes/java/awt/EventDispatchThread.java src/share/classes/java/lang/CharSequence.java src/share/classes/java/util/ArrayList.java src/share/classes/java/util/Collection.java src/share/classes/java/util/LinkedHashSet.java src/share/classes/java/util/List.java src/share/classes/java/util/Queue.java src/share/classes/java/util/Set.java src/share/classes/java/util/SortedSet.java src/share/classes/java/util/Vector.java src/share/classes/java/util/streams/AbstractPipeline.java src/share/classes/java/util/streams/Spliterator.java src/share/classes/java/util/streams/StreamAccessor.java src/share/classes/java/util/streams/StreamOpFlags.java src/share/classes/java/util/streams/Streams.java src/share/classes/java/util/streams/ValuePipeline.java src/share/classes/java/util/streams/ops/CollectorOps.java src/share/classes/java/util/streams/ops/Node.java src/share/classes/java/util/streams/ops/Nodes.java src/share/classes/java/util/streams/ops/SortedOp.java test-ng/tests/org/openjdk/tests/java/util/streams/ops/FlagOpTest.java test-ng/tests/org/openjdk/tests/java/util/streams/ops/NodeTest.java test-ng/tests/org/openjdk/tests/java/util/streams/ops/StreamOpTestCase.java test-ng/tests/org/openjdk/tests/java/util/streams/ops/ToArrayOpTest.java
diffstat 25 files changed, 756 insertions(+), 1132 deletions(-) [+]
line wrap: on
line diff
--- a/src/share/classes/com/sun/tools/jdi/EventSetImpl.java	Fri Nov 02 11:36:01 2012 +0100
+++ b/src/share/classes/com/sun/tools/jdi/EventSetImpl.java	Sat Nov 03 20:12:39 2012 +0100
@@ -30,6 +30,8 @@
 import com.sun.jdi.request.*;
 
 import java.util.*;
+import java.util.streams.Stream;
+
 enum EventDestination {UNKNOWN_EVENT, INTERNAL_EVENT, CLIENT_EVENT};
 
 /*
@@ -871,4 +873,9 @@
     public void clear() {
         throw new UnsupportedOperationException();
     }
+
+    @Override
+    public Stream<Event> stream() {
+        return super.stream();
+    }
 }
--- a/src/share/classes/java/awt/EventDispatchThread.java	Fri Nov 02 11:36:01 2012 +0100
+++ b/src/share/classes/java/awt/EventDispatchThread.java	Sat Nov 03 20:12:39 2012 +0100
@@ -120,10 +120,10 @@
     private Conditional _macosxGetConditional(final Object cond) {
         try {
             return new Conditional() {
-                final Method evaluateMethod = Class.forName("sun.lwawt.macosx.EventDispatchAccess").getMethod("evaluate", null);
+                final Method evaluateMethod = Class.forName("sun.lwawt.macosx.EventDispatchAccess").getMethod("evaluate", (Class[]) null);
                 public boolean evaluate() {
                     try {
-                        return ((Boolean)evaluateMethod.invoke(cond, null)).booleanValue();
+                        return ((Boolean)evaluateMethod.invoke(cond, (Object[]) null)).booleanValue();
                     } catch (Exception e) {
                         return false;
                     }
--- a/src/share/classes/java/lang/CharSequence.java	Fri Nov 02 11:36:01 2012 +0100
+++ b/src/share/classes/java/lang/CharSequence.java	Sat Nov 03 20:12:39 2012 +0100
@@ -27,8 +27,10 @@
 
 import java.util.Iterator;
 import java.util.NoSuchElementException;
+import java.util.Sized;
 import java.util.functions.Block;
 import java.util.streams.Stream;
+import java.util.streams.StreamOpFlags;
 import java.util.streams.Streams;
 
 /**
@@ -120,7 +122,7 @@
      * @return an Iterable of Character values from this sequence
      */
     public Stream<Character> asChars() default {
-        return Streams.stream(new CharacterIterable(this), length());
+        return Streams.stream(new CharacterIterable(this), StreamOpFlags.IS_SIZED | StreamOpFlags.IS_ORDERED);
     }
 
     /**
@@ -139,11 +141,11 @@
      * @return an Iterable of Unicode code points from this sequence
      */
     public Stream<Integer> asCodePoints() default {
-        return Streams.stream(new CodePointIterable(this));
+        return Streams.stream(new CodePointIterable(this), StreamOpFlags.IS_ORDERED);
     }
 }
 
-class CharacterIterable implements Iterable<Character> {
+class CharacterIterable implements Iterable<Character>, Sized {
     final CharSequence cs;
 
     CharacterIterable(CharSequence cs) {
@@ -151,6 +153,11 @@
     }
 
     @Override
+    public int size() {
+        return cs.length();
+    }
+
+    @Override
     public void forEach(Block<? super Character> block) {
         for (int i = 0; i < cs.length(); i++)
             block.apply(cs.charAt(i));
--- a/src/share/classes/java/util/ArrayList.java	Fri Nov 02 11:36:01 2012 +0100
+++ b/src/share/classes/java/util/ArrayList.java	Sat Nov 03 20:12:39 2012 +0100
@@ -25,13 +25,6 @@
 
 package java.util;
 
-import java.util.functions.Block;
-import java.util.streams.Spliterator;
-import java.util.streams.Stream;
-import java.util.streams.StreamAccessor;
-import java.util.streams.StreamOpFlags;
-import java.util.streams.Streams;
-
 /**
  * Resizable-array implementation of the <tt>List</tt> interface.  Implements
  * all optional list operations, and permits all elements, including
@@ -1137,20 +1130,20 @@
     }
 
 
-    private class ArrayProxyImpl<E> implements Streams.ArrayProxy<E> {
-        public E[] getArray() { return (E[]) ArrayList.this.elementData; }
-        public int getOffset() { return 0; }
-        public int getLen() { return ArrayList.this.size; }
-        public int getModCount() { return ArrayList.this.modCount;  }
-    }
-
-    @Override
-    public Stream<E> stream() {
-        return Streams.stream(new ArrayProxyImpl<E>());
-    }
-
-    @Override
-    public Stream<E> parallel() {
-        return Streams.parallel(new ArrayProxyImpl<E>());
-    }
+//    private class ArrayProxyImpl<E> implements Streams.ArrayProxy<E> {
+//        public E[] getArray() { return (E[]) ArrayList.this.elementData; }
+//        public int getOffset() { return 0; }
+//        public int getLen() { return ArrayList.this.size; }
+//        public int getModCount() { return ArrayList.this.modCount;  }
+//    }
+//
+//    @Override
+//    public Stream<E> stream() {
+//        return Streams.stream(new ArrayProxyImpl<E>());
+//    }
+//
+//    @Override
+//    public Stream<E> parallel() {
+//        return Streams.parallel(new ArrayProxyImpl<E>());
+//    }
 }
--- a/src/share/classes/java/util/Collection.java	Fri Nov 02 11:36:01 2012 +0100
+++ b/src/share/classes/java/util/Collection.java	Sat Nov 03 20:12:39 2012 +0100
@@ -27,6 +27,7 @@
 
 import java.util.functions.Predicate;
 import java.util.streams.Stream;
+import java.util.streams.StreamOpFlags;
 import java.util.streams.Streamable;
 import java.util.streams.Streams;
 
@@ -491,7 +492,7 @@
 
     @Override
     Stream<E> stream() default {
-        return Streams.stream(this);
+        return Streams.stream(this, StreamOpFlags.IS_SIZED);
     }
 
     @Override
--- a/src/share/classes/java/util/LinkedHashSet.java	Fri Nov 02 11:36:01 2012 +0100
+++ b/src/share/classes/java/util/LinkedHashSet.java	Sat Nov 03 20:12:39 2012 +0100
@@ -175,6 +175,6 @@
 
     @Override
     public Stream<E> stream() {
-        return Streams.stream(this, StreamOpFlags.IS_DISTINCT | StreamOpFlags.IS_ORDERED);
+        return Streams.stream(this, StreamOpFlags.IS_SIZED | StreamOpFlags.IS_DISTINCT | StreamOpFlags.IS_ORDERED);
     }
 }
--- a/src/share/classes/java/util/List.java	Fri Nov 02 11:36:01 2012 +0100
+++ b/src/share/classes/java/util/List.java	Sat Nov 03 20:12:39 2012 +0100
@@ -616,7 +616,6 @@
 
     @Override
     Stream<E> stream() default {
-        // @@@ If instance of RandomAccess, then can choose optimial implementation RandomAccessListStreamAccessor
-        return Streams.stream(this, StreamOpFlags.IS_ORDERED);
+        return Streams.stream(this, StreamOpFlags.IS_SIZED | StreamOpFlags.IS_ORDERED);
     }
 }
--- a/src/share/classes/java/util/Queue.java	Fri Nov 02 11:36:01 2012 +0100
+++ b/src/share/classes/java/util/Queue.java	Sat Nov 03 20:12:39 2012 +0100
@@ -35,6 +35,10 @@
 
 package java.util;
 
+import java.util.streams.Stream;
+import java.util.streams.StreamOpFlags;
+import java.util.streams.Streams;
+
 /**
  * A collection designed for holding elements prior to processing.
  * Besides basic {@link java.util.Collection Collection} operations,
--- a/src/share/classes/java/util/Set.java	Fri Nov 02 11:36:01 2012 +0100
+++ b/src/share/classes/java/util/Set.java	Sat Nov 03 20:12:39 2012 +0100
@@ -389,6 +389,6 @@
 
     @Override
     Stream<E> stream() default {
-        return Streams.stream(this, StreamOpFlags.IS_DISTINCT);
+        return Streams.stream(this, StreamOpFlags.IS_SIZED | StreamOpFlags.IS_DISTINCT);
     }
 }
--- a/src/share/classes/java/util/SortedSet.java	Fri Nov 02 11:36:01 2012 +0100
+++ b/src/share/classes/java/util/SortedSet.java	Sat Nov 03 20:12:39 2012 +0100
@@ -226,6 +226,7 @@
 
     @Override
     Stream<E> stream() default {
-        return Streams.stream(this, StreamOpFlags.IS_DISTINCT | StreamOpFlags.IS_SORTED | StreamOpFlags.IS_ORDERED);
+        return Streams.stream(this, StreamOpFlags.IS_SIZED | StreamOpFlags.IS_DISTINCT
+                                    | StreamOpFlags.IS_SORTED | StreamOpFlags.IS_ORDERED);
     }
 }
--- a/src/share/classes/java/util/Vector.java	Fri Nov 02 11:36:01 2012 +0100
+++ b/src/share/classes/java/util/Vector.java	Sat Nov 03 20:12:39 2012 +0100
@@ -1213,23 +1213,20 @@
         }
     }
 
-    @Override
-    public Stream<E> stream() {
-        return Streams.stream(new Streams.ArrayProxy() {
-                public E[] getArray() { return (E[]) Vector.this.elementData; }
-                public int getOffset() { return 0; }
-                public int getLen() { return Vector.this.elementCount; }
-                public int getModCount() { return Vector.this.modCount;  }
-                });
-    }
-
-    @Override
-    public Stream<E> parallel() {
-        return Streams.parallel(new Streams.ArrayProxy() {
-                public E[] getArray() { return (E[]) Vector.this.elementData; }
-                public int getOffset() { return 0; }
-                public int getLen() { return Vector.this.elementCount; }
-                public int getModCount() { return Vector.this.modCount;  }
-                });
-    }
+//    private class ArrayProxyImpl<E> implements Streams.ArrayProxy<E> {
+//        public E[] getArray() { return (E[]) Vector.this.elementData; }
+//        public int getOffset() { return 0; }
+//        public int getLen() { return Vector.this.elementCount; }
+//        public int getModCount() { return Vector.this.modCount;  }
+//    }
+//
+//    @Override
+//    public Stream<E> stream() {
+//        return Streams.stream(new ArrayProxyImpl<E>());
+//    }
+//
+//    @Override
+//    public Stream<E> parallel() {
+//        return Streams.parallel(new ArrayProxyImpl<E>());
+//    }
 }
--- a/src/share/classes/java/util/streams/AbstractPipeline.java	Fri Nov 02 11:36:01 2012 +0100
+++ b/src/share/classes/java/util/streams/AbstractPipeline.java	Sat Nov 03 20:12:39 2012 +0100
@@ -39,9 +39,11 @@
  */
 public abstract class AbstractPipeline<E_IN, E_OUT> {
     protected final AbstractPipeline<?, E_IN> upstream;
-    protected final StreamAccessor<?> source;
     protected final IntermediateOp<E_IN, E_OUT> op;
+    protected final Spliterator<?> spliterator;
     protected final int depth;
+    protected final int sourceFlags;
+    protected final StreamShape shape;
 
     /**
      * Evaluation works in two modes:
@@ -49,7 +51,7 @@
      * 1) Sequential; and
      * 2) Parallel.
      *
-     * Sequential evaluation is performed when a stream accessor does not support splitting or when splitting is no
+     * Sequential evaluation is performed when a spliterator does not support splitting or when splitting is no
      * longer possible because some part of the pipeline has already been pulled.
      *
      * Parallel evaluation slices up the pipeline into a list of intermediate slices and a terminal slice.
@@ -60,19 +62,19 @@
      *
      * Each intermediate slice is evaluated, in order, and in parallel to produce an intermediate list of elements
      * that is held by a Node.
-     * That Node is converted to a StreamAccessor which becomes is the source or input into the next intermediate
+     * That Node is converted to a Spliterator which becomes is the source or input into the next intermediate
      * or terminal slice.
      * The process repeats until the terminal slice is processed to produce a result.
      *
-     * If an intermediate result is a Node that when converted to a stream accessor does not support splitting then all
+     * If an intermediate result is a Node that when converted to a Spliterator does not support splitting then all
      * the remaining operations in the pipeline (the next operation after the stateful operation in the slice and
      * up to and including the terminal operation) are evaluated sequentially.
      *
-     * Combined stream and operation flags are calculated for each slice of the pipeline. When a stream accessor for
-     * a Node is created the resultant stream and operation flags produced from the intermediate stateful operation
-     * are utilized to create the stream flags for the stream accessor. This ensures stream properties are
-     * preserved and propagated for the next intermediate evaluation. In addition the SIZED property will be
-     * explicitly set, since the Node knows it's size.
+     * Combined stream and operation flags are calculated for each slice of the pipeline. When a spliterator for
+     * a Node is created the resultant source and operation flags produced from the intermediate stateful operation
+     * are utilized to create the source flags for that spliterator. This ensures stream properties are preserved and
+     * propagated for the next intermediate evaluation. In addition the SIZED property will be explicitly set, since
+     * the Node knows it's size.
      */
 
     /**
@@ -83,13 +85,17 @@
     /**
      * Constructor for the element source of a pipeline.
      *
-     * @param source element source
+     * @param spliterator a Spliterator describing the input elements
+     * @param sourceFlags the source flags associated with the spliterator
+     * @param shape the shape of the source
      */
-    protected AbstractPipeline(StreamAccessor<?> source) {
-        this.source = Objects.requireNonNull(source);
+    protected AbstractPipeline(Spliterator<?> spliterator, int sourceFlags, StreamShape shape) {
+        this.upstream = null;
         this.op = null;
-        this.upstream = null;
+        this.spliterator = Objects.requireNonNull(spliterator);
         this.depth = 0;
+        this.sourceFlags = sourceFlags;
+        this.shape = shape;
     }
 
     /**
@@ -101,8 +107,10 @@
     protected AbstractPipeline(AbstractPipeline<?, E_IN> upstream, IntermediateOp<E_IN, E_OUT> op) {
         this.upstream = Objects.requireNonNull(upstream);
         this.op = Objects.requireNonNull(op);
-        this.source = upstream.source;
+        this.spliterator = upstream.spliterator;
         this.depth = upstream.depth + 1;
+        this.sourceFlags = upstream.sourceFlags;
+        this.shape = upstream.shape;
         assert upstream.getShape() == op.inputShape();
         assert (upstream.depth == 0) ^ (upstream.op != null);
     }
@@ -113,7 +121,7 @@
 
     protected<R> R evaluate(TerminalOp<E_OUT, R> terminal) {
         // @@@ NYI If the source size estimate is small, don't bother going parallel
-        if (source.isParallel()) {
+        if (StreamOpFlags.PARALLEL.isKnown(sourceFlags)) {
             return evaluateParallel(terminal);
         }
         else
@@ -133,49 +141,58 @@
         final int[] opsFlags = new int[ops.length + 1];
         int fromOp = 0;
         int upToOp = 0;
-        StreamAccessor<?> accessor = source;
+        Node<?> iNode = null;
+        Spliterator<?> iSource = spliterator;
+        int iSourceFlags = sourceFlags;
         while (true) {
             while (upToOp < ops.length && !ops[upToOp].isStateful())
                 upToOp++;
 
             if (upToOp < ops.length) {
-                StatefulOp op = (StatefulOp) ops[upToOp];
-                Node<?> intermediateResult = evaluateParallel(accessor, opsFlags, ops, fromOp, upToOp, op);
+                StatefulOp<?, ?> op = (StatefulOp) ops[upToOp];
+                iNode = evaluateParallel(iNode, iSource, iSourceFlags, opsFlags, ops, fromOp, upToOp, op);
 
                 // Get the combined stream and ops flags for the stateful op
                 int sourceAndOpsFlags = StreamOpFlags.combineStreamFlags(
-                        accessor.getStreamFlags(),
+                        iSourceFlags,
                         StreamOpFlags.combineOpFlags(op.getOpFlags(), opsFlags[upToOp]));
-                // Get the stream flags for the intermediate stream
-                int streamFlags = StreamOpFlags.flagsToStreamFlags(sourceAndOpsFlags);
+                // Get the source flags for the intermediate stream
+                iSourceFlags = StreamOpFlags.flagsToStreamFlags(sourceAndOpsFlags);
                 // Create stream accessor from node using the stream flags
-                accessor = intermediateResult.asStreamAccessor(streamFlags);
+                iSource = iNode.spliterator();
 
                 fromOp = ++upToOp;
 
-                if (!accessor.isParallel()) {
-                    return evaluateSequential(accessor, opsFlags, ops, fromOp, ops.length, terminal);
+                if (!StreamOpFlags.PARALLEL.isKnown(iSourceFlags)) {
+                    return evaluateSequential(iNode, iSource, iSourceFlags, opsFlags, ops, fromOp, ops.length, terminal);
                 }
             }
             else {
-                return evaluateParallel(accessor, opsFlags, ops, fromOp, upToOp, terminal);
+                return evaluateParallel(iNode, iSource, iSourceFlags, opsFlags, ops, fromOp, upToOp, terminal);
             }
         }
     }
 
     @SuppressWarnings("unchecked")
-    private <R> Node<R> evaluateParallel(StreamAccessor source,
-                                         int[] opsFlags,
-                                         IntermediateOp[] ops, int from, int upTo,
-                                         StatefulOp<?, R> terminal) {
-        return (Node<R>) terminal.evaluateParallel(new ParallelImplPipelineHelper(source, opsFlags, ops, from, upTo));
+    private <R> R evaluateParallel(Node<?> node,
+                                   Spliterator<?> spliterator,
+                                   int sourceFlags,
+                                   int[] opsFlags,
+                                   IntermediateOp[] ops, int from, int upTo,
+                                   StreamOp<?, R> terminal) {
+        return (R) terminal.evaluateParallel(new ParallelImplPipelineHelper(node, spliterator, sourceFlags,
+                                                                            opsFlags, ops, from, upTo));
     }
 
-    private <R> R evaluateParallel(StreamAccessor<?> source,
-                                   int[] opsFlags,
-                                   IntermediateOp[] ops, int from, int upTo,
-                                   TerminalOp<E_OUT, R> terminal) {
-        return terminal.evaluateParallel(new ParallelImplPipelineHelper<>(source, opsFlags, ops, from, upTo));
+    @SuppressWarnings("unchecked")
+    private <R> R evaluateSequential(Node<?> node,
+                                     Spliterator<?> spliterator,
+                                     int sourceFlags,
+                                     int[] opsFlags,
+                                     IntermediateOp[] ops, int from, int upTo,
+                                     TerminalOp<E_OUT, R> terminal) {
+        return (R) terminal.evaluateSequential(new SequentialImplPipelineHelper(node, spliterator, sourceFlags,
+                                                                                opsFlags, ops, from, upTo));
     }
 
     @SuppressWarnings("unchecked")
@@ -183,34 +200,35 @@
         return (R) terminal.evaluateSequential(new SequentialImplPipelineHelperSource());
     }
 
-    private <R> R evaluateSequential(StreamAccessor<?> source,
-                                     int[] opsFlags,
-                                     IntermediateOp[] ops, int from, int upTo,
-                                     TerminalOp<E_OUT, R> terminal) {
-        return terminal.evaluateSequential(new SequentialImplPipelineHelper<>(source, opsFlags, ops, from, upTo));
-    }
-
     static abstract class AbstractPipelineHelper<P_IN, P_OUT> implements PipelineHelper<P_IN, P_OUT> {
-        protected final StreamAccessor<P_IN> source;
+        final Node<P_IN> node;
+        final Spliterator<P_IN> spliterator;
         final IntermediateOp[] ops;
         final int[] opsFlags;
         final int from;
         final int upTo;
+        final int sourceFlags;
         final int sourceAndOpsFlags;
 
-        AbstractPipelineHelper(StreamAccessor<P_IN> source, int[] opsFlags, IntermediateOp[] ops, int from, int upTo) {
-            this.source = source;
+        AbstractPipelineHelper(Spliterator<P_IN> spliterator, int sourceFlags, int[] opsFlags, IntermediateOp[] ops, int from, int upTo) {
+            this(null, spliterator, sourceFlags, opsFlags, ops, from, upTo);
+        }
+
+        AbstractPipelineHelper(Node<P_IN> node, Spliterator<P_IN> spliterator, int sourceFlags, int[] opsFlags, IntermediateOp[] ops, int from, int upTo) {
+            this.node = node;
+            this.spliterator = spliterator;
             this.opsFlags = opsFlags;
             this.ops = ops;
             this.from = from;
             this.upTo = upTo;
+            this.sourceFlags = sourceFlags;
 
             int flags = opsFlags[from] = StreamOpFlags.INITIAL_OPS_VALUE;
             for (int i = from; i < upTo; i++) {
                 flags = opsFlags[i + 1] = StreamOpFlags.combineOpFlags(ops[i].getOpFlags(), flags);
             }
 
-            this.sourceAndOpsFlags = StreamOpFlags.combineStreamFlags(source.getStreamFlags(), flags);
+            this.sourceAndOpsFlags = StreamOpFlags.combineStreamFlags(sourceFlags, flags);
         }
 
         protected boolean isOutputSizeKnown() {
@@ -228,8 +246,8 @@
         @Override
         public <S extends Sink<P_OUT>> S into(S sink) {
             Sink<P_IN> wrappedSink = wrapSink(sink);
-            wrappedSink.begin(source.getSizeIfKnown());
-            source.forEach(wrappedSink);
+            wrappedSink.begin(spliterator.getSizeIfKnown());
+            spliterator.forEach(wrappedSink);
             wrappedSink.end();
             return sink;
         }
@@ -241,7 +259,7 @@
 
         @Override
         public int getOutputSizeIfKnown() {
-            return isOutputSizeKnown() ? source.getSizeIfKnown() : -1;
+            return isOutputSizeKnown() ? spliterator.getSizeIfKnown() : -1;
         }
 
         @Override
@@ -250,7 +268,7 @@
 
             for (int i = upTo - 1; i >= from; i--) {
                 sink = ops[i].wrapSink(
-                        StreamOpFlags.combineStreamFlags(source.getStreamFlags(), opsFlags[i]),
+                        StreamOpFlags.combineStreamFlags(sourceFlags, opsFlags[i]),
                         sink);
             }
             return sink;
@@ -262,7 +280,7 @@
 
             for (int i = from; i < upTo; i++) {
                 it = ops[i].wrapIterator(
-                        StreamOpFlags.combineStreamFlags(source.getStreamFlags(), opsFlags[i]),
+                        StreamOpFlags.combineStreamFlags(sourceFlags, opsFlags[i]),
                         it);
             }
             return it;
@@ -271,18 +289,14 @@
 
     class SequentialImplPipelineHelper<P_IN> extends AbstractPipelineHelper<P_IN, E_OUT> {
 
-        SequentialImplPipelineHelper(StreamAccessor<P_IN> source, IntermediateOp[] ops) {
+        SequentialImplPipelineHelper(Spliterator<P_IN> spliterator, int sourceFlags, IntermediateOp[] ops) {
             // Start from 2nd element since 1st is the head containing the source
-            super(source, new int[ops.length + 1], ops, 0, ops.length);
+            super(spliterator, sourceFlags, new int[ops.length + 1], ops, 0, ops.length);
         }
 
-        SequentialImplPipelineHelper(StreamAccessor<P_IN> source, int[] opsFlags, IntermediateOp[] ops, int from, int to) {
-            super(source, opsFlags, ops, from, to);
-        }
-
-        @Override
-        public int getOutputSizeIfKnown() {
-            return isOutputSizeKnown() ? source.getSizeIfKnown() : -1;
+        SequentialImplPipelineHelper(Node<P_IN> node, Spliterator<P_IN> spliterator, int sourceFlags,
+                                     int[] opsFlags, IntermediateOp[] ops, int from, int to) {
+            super(node, spliterator, sourceFlags, opsFlags, ops, from, to);
         }
 
         boolean requirePull() {
@@ -306,10 +320,8 @@
 
         @Override
         public Node<E_OUT> collectOutput() {
-            if (hasZeroDepth() && source instanceof Nodes.NodeStreamAccessor) {
-                // If the stream accessor holds a node
-                // @@@ can abstract with StreamAccessor.asArray ?
-                return ((Nodes.NodeStreamAccessor<E_OUT>) source).asNode();
+            if (hasZeroDepth() && node != null) {
+                return (Node<E_OUT>) node;
             }
             else
                 return into(Nodes.<E_OUT>makeBuilder(getOutputSizeIfKnown())).build();
@@ -317,14 +329,14 @@
 
         @Override
         public Iterator<E_OUT> iterator() {
-            return wrapIterator(source.iterator());
+            return wrapIterator(spliterator.iterator());
         }
     }
 
     class SequentialImplPipelineHelperSource extends SequentialImplPipelineHelper {
 
         <R> SequentialImplPipelineHelperSource() {
-            super(AbstractPipeline.this.source, ops());
+            super(AbstractPipeline.this.spliterator, AbstractPipeline.this.sourceFlags, ops());
         }
 
         @Override
@@ -343,16 +355,18 @@
         }
     }
 
-    class ParallelImplPipelineHelper<P_IN> extends AbstractPipelineHelper<P_IN, E_OUT> implements ParallelPipelineHelper<P_IN, E_OUT> {
+    class ParallelImplPipelineHelper<P_IN> extends AbstractPipelineHelper<P_IN, E_OUT>
+            implements ParallelPipelineHelper<P_IN, E_OUT> {
         final long targetSize;
 
-        ParallelImplPipelineHelper(StreamAccessor<P_IN> source, int[] opsFlags, IntermediateOp[] ops, int from, int to) {
-            super(source, opsFlags, ops, from, to);
+        ParallelImplPipelineHelper(Node<P_IN> node, Spliterator<P_IN> spliterator, int sourceFlags,
+                                   int[] opsFlags, IntermediateOp[] ops, int from, int to) {
+            super(node, spliterator, sourceFlags, opsFlags, ops, from, to);
             this.targetSize = calculateTargetSize();
         }
 
         private long calculateTargetSize() {
-            int estimate = source.estimateSize();
+            int estimate = spliterator.estimateSize();
             return estimate >= 0
                    ? ForkJoinUtils.suggestTargetSize(estimate)
                    : 2; // @@@ SWAG
@@ -372,25 +386,22 @@
 
         @Override
         public Iterator<E_OUT> iterator() {
-            return wrapIterator(source.iterator());
+            return wrapIterator(spliterator.iterator());
         }
 
         @Override
         public Spliterator<P_IN> spliterator() {
-            return source.spliterator();
+            return spliterator;
         }
 
         @Override
         public Node<E_OUT> collectOutput() {
-            if (hasZeroDepth() && source instanceof Nodes.NodeStreamAccessor) {
-                // If the stream accessor holds a node
-                // @@@ can abstract with StreamAccessor.asArray ?
-                return ((Nodes.NodeStreamAccessor<E_OUT>) source).asNode();
+            if (hasZeroDepth() && node != null) {
+                return (Node<E_OUT>) node;
             }
             else
                 return TreeUtils.collect(this, false);
         }
-
     }
 
     private IntermediateOp[] ops() {
@@ -431,14 +442,14 @@
 
             // Check if this is the first pull and if so get the source iterator
             if (pipes[0].iterator == null) {
-                pipes[0].iterator = source.iterator();
+                pipes[0].iterator = spliterator.iterator();
             }
 
             int opsFlags = StreamOpFlags.INITIAL_OPS_VALUE;
             for (int i = 1; i <= depth; i++) {
                 p = pipes[i];
                 if (p.iterator == null) {
-                    p.iterator = p.op.wrapIterator(StreamOpFlags.combineStreamFlags(source.getStreamFlags(), opsFlags),
+                    p.iterator = p.op.wrapIterator(StreamOpFlags.combineStreamFlags(sourceFlags, opsFlags),
                                                    pipes[i - 1].iterator);
                 }
                 opsFlags = StreamOpFlags.combineOpFlags(pipes[i].op.getOpFlags(), opsFlags);
@@ -448,15 +459,15 @@
     }
 
     public boolean isParallel() {
-        return source.isParallel();
+        return StreamOpFlags.PARALLEL.isKnown(sourceFlags);
     }
 
     public StreamShape getShape() {
-        return op == null ? source.getShape() : op.outputShape();
+        return op == null ? shape : op.outputShape();
     }
 
     public StreamShape getInputShape() {
-        return source.getShape();
+        return shape;
     }
 
     public<R> R pipeline(TerminalOp<E_OUT, R> terminal) {
--- a/src/share/classes/java/util/streams/Spliterator.java	Fri Nov 02 11:36:01 2012 +0100
+++ b/src/share/classes/java/util/streams/Spliterator.java	Sat Nov 03 20:12:39 2012 +0100
@@ -78,12 +78,12 @@
     /**
      * Provides any remaining elements into the provided sink.
      *
-     * @param sink The sink to which elements will be provided.
+     * @param block The sink to which elements will be provided.
      */
-    void forEach(Block<? super T> sink) default {
+    void forEach(Block<? super T> block) default {
         Iterator<T> remaining = iterator();
         while (remaining.hasNext()) {
-            sink.apply(remaining.next());
+            block.apply(remaining.next());
         }
     }
 
@@ -124,9 +124,7 @@
      * @return {@code true} if size information is available for this
      * Spliterator and all sub-splits.
      */
-    boolean isPredictableSplits() default {
-        return false;
-    }
+    boolean isPredictableSplits();
 
     /**
      * Ascertain if the spliterator should be split further or not, based on a target leaf size.
@@ -138,4 +136,30 @@
         int remaining = estimateSize();
         return (remaining <= targetSize && remaining >= 0) || (getNaturalSplits() == 0);
     }
+
+    /**
+     * A Spliterator that can only be used to sequentially traverse a source.
+     * @param <T>
+     */
+    public interface Sequential<T> extends Spliterator<T> {
+        @Override
+        int getNaturalSplits() default {
+            return 0;
+        }
+
+        @Override
+        Spliterator<T> split() default {
+            return Streams.emptySpliterator();
+        }
+
+        @Override
+        boolean isPredictableSplits() default {
+            return false;
+        }
+
+        @Override
+        boolean shouldNotSplit(long targetSize) default {
+            return true;
+        }
+    }
 }
--- a/src/share/classes/java/util/streams/StreamAccessor.java	Fri Nov 02 11:36:01 2012 +0100
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,130 +0,0 @@
-/*
- * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.  Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-package java.util.streams;
-
-import java.util.Iterator;
-import java.util.functions.Block;
-
-/**
- * StreamAccessor
- *
- * @param <T> Type of elements
- *
- * @author Brian Goetz
- */
-public interface StreamAccessor<T> {
-
-    /**
-     * Provides any remaining elements into the provided sink.
-     *
-     * @param sink The sink to which elements will be provided.
-     */
-    void forEach(Block<? super T> sink);
-
-    /**
-     * Return the iterator for the elements of this stream. The same iterator
-     * instance is returned for every invocation.  Once the elements of the
-     * stream are consumed it is not possible to "rewind" the stream.
-     *
-     * @return the element iterator for this stream.
-     */
-    Iterator<T> iterator();
-
-    /**
-     * Get the known properties of the stream.
-     *
-     * @return the known properties of the stream.
-     * @see {@link StreamOpFlags}
-     */
-    public int getStreamFlags();
-
-   /**
-     * Returns the count of elements currently retained by this Stream.
-     * This is the count which would be processed by {@code into()} or via
-     * {@code iterator()}. If the element count is infinite or cannot be
-     * computed exactly and cheaply then {@code -1} is returned.
-     * <p/>
-     * @return The number of remaining elements or {@code -1} if the remaining
-     * element count is unavailable.
-     */
-    int getSizeIfKnown() default {
-        return -1;
-    }
-
-    /**
-     * Returns an estimate, possibly inexact, of the count of elements currently
-     * retained by this Stream. This is the count which would be processed
-     * by {@code into()} or via {@code iterator()}. If an element count is
-     * infinite or an estimate cannot be computed cheaply then {@code -1} is
-     * returned.
-     * <p>
-     * If {@code getSizeIfKnown()} returns a non-negative integer then
-     * {@code estimateSize()} <strong>must</strong> return the same value.
-     *
-     * @return The number of remaining elements or {@code -1} if the remaining
-     * element count is unavailable.
-     */
-    int estimateSize() default {
-        return getSizeIfKnown();
-    }
-
-    /**
-     * If {@code true} then source is usable from multiple threads simultaneously.
-     *
-     * @return {@code true} then source is usable from multiple threads
-     * simultaneously.
-     */
-    boolean isParallel();
-
-    StreamShape getShape() default {
-        return StreamShape.VALUE;
-    }
-
-    /**
-     * Returns a spliterator for this stream. Must be requested before
-     * {@code iterator()} is called.
-     *
-     * @return a spliterator for this stream.
-     */
-    Spliterator<T> spliterator();
-
-    public interface ForSequential<T> extends StreamAccessor<T> {
-        @Override
-        public boolean isParallel() default {
-            return false;
-        }
-
-        @Override
-        public Spliterator<T> spliterator() default {
-            throw new UnsupportedOperationException();
-        }
-
-        // @@@ This isn't truly associated with Sequentialness.
-        @Override
-        public int getStreamFlags() default {
-            return 0;
-        }
-    }
-}
--- a/src/share/classes/java/util/streams/StreamOpFlags.java	Fri Nov 02 11:36:01 2012 +0100
+++ b/src/share/classes/java/util/streams/StreamOpFlags.java	Sat Nov 03 20:12:39 2012 +0100
@@ -46,7 +46,9 @@
 
     SIZED(3),
 
-    SHORT_CIRCUIT(4, false)
+    SHORT_CIRCUIT(4, false),
+
+    PARALLEL(5)
     ;
 
     /**
@@ -228,6 +230,16 @@
      */
     public static final int IS_SHORT_CIRCUIT = SHORT_CIRCUIT.set;
 
+    /**
+     * The stream can be decomposed for parallel evaluation
+     */
+    public static final int IS_PARALLEL = PARALLEL.set;
+
+    /**
+     * The stream cannot be decomposed for parallel evaluation
+     */
+    public static final int NOT_PARALLEL = PARALLEL.clear;
+
     private static int getMask(int flags) {
         return (flags == 0)
                ? FLAG_MASK
--- a/src/share/classes/java/util/streams/Streams.java	Fri Nov 02 11:36:01 2012 +0100
+++ b/src/share/classes/java/util/streams/Streams.java	Sat Nov 03 20:12:39 2012 +0100
@@ -35,38 +35,94 @@
  * @author Brian Goetz
  */
 public class Streams {
+    private static final Stream EMPTY_STREAM = stream(Collections.emptyList(), 0);
+
     private Streams() {
         throw new Error("no instances");
     }
 
     // Stream
 
-    public static<T> Stream<T> stream(Collection<T> source) {
-        return new ValuePipeline<>(new CollectionStreamAccessor<>(source));
+    public static<U, T extends Sized & Iterable<U>> Stream<U> stream(T entity, int flags) {
+        return new ValuePipeline<>(new Spliterator.Sequential<U>() {
+            @Override
+            public Iterator<U> iterator() {
+                return entity.iterator();
+            }
+
+            @Override
+            public void forEach(Block<? super U> block) {
+                entity.forEach(block);
+            }
+
+            @Override
+            public int getSizeIfKnown() {
+                return entity.size();
+            }
+        }, StreamOpFlags.IS_SIZED | flags);
     }
 
-    public static<T> Stream<T> stream(Collection<T> source, int flags) {
-        return new ValuePipeline<>(new CollectionStreamAccessor<>(source, flags));
+    public static<U, T extends Iterable<U>> Stream<U> stream(T entity, Sized sizeProvider, int flags) {
+        return new ValuePipeline<>(new Spliterator.Sequential<U>() {
+            @Override
+            public Iterator<U> iterator() {
+                return entity.iterator();
+            }
+
+            @Override
+            public void forEach(Block<? super U> block) {
+                entity.forEach(block);
+            }
+
+            @Override
+            public int getSizeIfKnown() {
+                return sizeProvider.size();
+            }
+        }, StreamOpFlags.IS_SIZED | flags);
     }
 
-    public static<T> Stream<T> stream(Iterable<T> source) {
-        return new ValuePipeline<>(new IterableStreamAccessor<>(source));
+    public static<U, T extends Iterable<U>> Stream<U> stream(T entity, int flags) {
+        return new ValuePipeline<>(new Spliterator.Sequential<U>() {
+            @Override
+            public Iterator<U> iterator() {
+                return entity.iterator();
+            }
+
+            @Override
+            public void forEach(Block<? super U> block) {
+                entity.forEach(block);
+            }
+            // @@@ Mask off sized if set ?
+        }, flags);
     }
 
-    public static<T> Stream<T> stream(Iterable<T> source, int sizeOrUnknown) {
-        return new ValuePipeline<>(new IterableStreamAccessor<>(source, sizeOrUnknown));
+    public static<U, T extends Iterator<U>> Stream<U> stream(T iterator, int flags) {
+        return new ValuePipeline<>(new Spliterator.Sequential<U>() {
+            @Override
+            public Iterator<U> iterator() {
+                return iterator;
+            }
+
+            @Override
+            public void forEach(Block<? super U> block) {
+                iterator.forEach(block);
+            }
+            // @@@ Mask off sized if set ?
+        }, flags);
     }
 
-    public static<T> Stream<T> stream(Iterable<T> source, int sizeOrUnknown, int flags) {
-        return new ValuePipeline<>(new IterableStreamAccessor<>(source, sizeOrUnknown, flags));
+    public static<U> Stream<U> stream(Spliterator.Sequential<U> spliterator, int flags) {
+        if (spliterator.getSizeIfKnown() >= 0)
+            flags |= StreamOpFlags.IS_SIZED;
+        return new ValuePipeline<>(spliterator, flags);
     }
 
-    public static <T> Stream<T> stream(ArrayProxy<T> proxy) {
-        return new ValuePipeline<>(new ArrayProxyStreamAccessor<>(proxy));
-    }
+//    public static <T> Stream<T> stream(ArrayProxy<T> proxy) {
+//        return new ValuePipeline<>(new ArrayProxyStreamAccessor<>(proxy));
+//    }
 
     public static <T> Spliterator<T> spliterator(T[] source) {
-        return new ArraySpliterator<>(source);
+        return spliterator(source, 0, source.length);
     }
 
     public static <T> Spliterator<T> spliterator(T[] source, int offset, int length) {
@@ -74,52 +130,56 @@
     }
 
     public static <T> Stream<T> stream(T[] source) {
-        return new ValuePipeline<>(new ArrayStreamAccessor<>(source));
+        return stream(source, 0, source.length);
     }
 
     public static <T> Stream<T> stream(T[] source, int offset, int length) {
-        return new ValuePipeline<>(new ArrayStreamAccessor<>(source, offset, length));
+        // Note use of full-service Spliterator here -- harmless because PARALLEL flag is not set
+        return new ValuePipeline<>(new ArraySpliterator<>(source, offset, length),
+                                   StreamOpFlags.IS_SIZED | StreamOpFlags.IS_ORDERED);
     }
 
     public static <T> Stream<T> parallel(T[] source) {
-        return new ValuePipeline<>(new ArrayParallelStreamAccessor<>(source));
+        return Streams.parallel(source, 0, source.length);
     }
 
     public static <T> Stream<T> parallel(T[] source, int offset, int length) {
-        return new ValuePipeline<>(new ArrayParallelStreamAccessor<>(source, offset, length));
+        return new ValuePipeline<>(new ArraySpliterator<>(source, offset, length),
+                                   StreamOpFlags.IS_SIZED | StreamOpFlags.IS_ORDERED | StreamOpFlags.IS_PARALLEL);
     }
 
-    public static<T> Stream<T> parallel(Spliterator<T> source) {
-        return new ValuePipeline<>(new SpliteratorStreamAccessor<>(source));
+    public static<T> Stream<T> parallel(Spliterator<T> spliterator, int flags) {
+        if (spliterator.getSizeIfKnown() >= 0)
+            flags |= StreamOpFlags.IS_SIZED;
+        return new ValuePipeline<>(spliterator, flags | StreamOpFlags.IS_PARALLEL);
     }
 
-    public static<T> Stream<T> parallel(Spliterator<T> source, int flags) {
-        return new ValuePipeline<>(new SpliteratorStreamAccessor<>(source, flags));
+//    public static <T> Stream<T> parallel(ArrayProxy<T> proxy) {
+//        return new ValuePipeline<>(new ArrayProxyParallelStreamAccessor<>(proxy));
+//    }
+
+    @SuppressWarnings("unchecked")
+    public static<T> Stream<T> emptyStream() {
+        return EMPTY_STREAM;
     }
 
-    public static <T> Stream<T> parallel(ArrayProxy<T> proxy) {
-        return new ValuePipeline<>(new ArrayProxyParallelStreamAccessor<>(proxy));
-    }
-
-
     /**
      * A small interface that defers getting of array data.
      *
      * @param <T> type of elements.
      */
-    public interface ArrayProxy<T> {
-        int getModCount();
-        T[] getArray();
-        int getOffset();
-        int getLen();
-    }
+//    public interface ArrayProxy<T> {
+//        int getModCount();
+//        T[] getArray();
+//        int getOffset();
+//        int getLen();
+//    }
 
     // Infinite streams
 
     public static<T> Stream<T> iterate(final T seed, final UnaryOperator<T> f) {
         Objects.requireNonNull(f);
-
-        final InfiniteIterator<T> iterate = new InfiniteIterator<T>() {
+        final InfiniteIterator<T> iterator = new InfiniteIterator<T>() {
             T t = null;
 
             @Override
@@ -127,8 +187,7 @@
                 return t = (t == null) ? seed : f.operate(t);
             }
         };
-
-        return new ValuePipeline<>(new InfiniteIteratorStreamAccessor<>(iterate));
+        return stream(iterator, StreamOpFlags.IS_ORDERED);
     }
 
     public static<T> Stream<T> repeat(T t) {
@@ -137,32 +196,11 @@
 
     public static<T> Stream<T> repeat(final int n, final T t) {
         if (n < 0) {
-            final InfiniteIterator<T> iterate = () -> t;
-
-            return new ValuePipeline<>(new InfiniteIteratorStreamAccessor<>(iterate));
+            InfiniteIterator<T> iterator = () -> t;
+            return stream(iterator, StreamOpFlags.IS_ORDERED);
         }
-        else {
-            final Iterator<T> repeat = new Iterator<T>() {
-                int c = n;
-
-                @Override
-                public boolean hasNext() {
-                    return c > 0;
-                }
-
-                @Override
-                public T next() {
-                    if (!hasNext()) {
-                        throw new NoSuchElementException();
-                    }
-
-                    c--;
-                    return t;
-                }
-            };
-
-            return new ValuePipeline<>(new IteratorStreamAccessor<>(repeat, -1, StreamOpFlags.IS_ORDERED));
-        }
+        else
+            return repeatedly(n, () -> t);
     }
 
     public static<T> Stream<T> repeatedly(Factory<T> f) {
@@ -173,9 +211,8 @@
         Objects.requireNonNull(f);
 
         if (n < 0) {
-            final InfiniteIterator<T> repeatedly = () -> f.make();
-
-            return new ValuePipeline<>(new InfiniteIteratorStreamAccessor<>(repeatedly));
+            InfiniteIterator<T> iterator = f::make;
+            return stream(iterator, StreamOpFlags.IS_ORDERED);
         }
         else {
             final Iterator<T> repeatedly = new Iterator<T>() {
@@ -197,7 +234,7 @@
                 }
             };
 
-            return new ValuePipeline<>(new IteratorStreamAccessor<>(repeatedly,  -1, StreamOpFlags.IS_ORDERED));
+            return stream(repeatedly, StreamOpFlags.IS_ORDERED);
         }
     }
 
@@ -206,7 +243,7 @@
 
         // Check if the source is empty
         if (!source.iterator().hasNext()) {
-            return Collections.<T>emptyList().stream();
+            return emptyStream();
         }
 
         final InfiniteIterator<T> cycle = new InfiniteIterator<T>() {
@@ -222,7 +259,7 @@
             }
         };
 
-        return new ValuePipeline<>(new InfiniteIteratorStreamAccessor<>(cycle));
+        return stream(cycle, StreamOpFlags.IS_ORDERED);
     }
 
     // @@@ Need from(StreamAccessor) methods
@@ -245,7 +282,7 @@
             }
 
             @Override
-            public void forEach(Block<? super Object> sink) {
+            public void forEach(Block<? super Object> block) {
             }
 
             @Override
@@ -263,59 +300,6 @@
         return (Spliterator<T>) EMPTY_SPLITERATOR;
     }
 
-    private static class IteratorStreamAccessor<T>
-            implements StreamAccessor.ForSequential<T>, Iterator<T> {
-        private final Iterator<T> it;
-        private final int sizeOrUnknown;
-        private final int flags;
-
-        public IteratorStreamAccessor(Iterator<T> it) {
-            this(it, -1, 0);
-        }
-
-        public IteratorStreamAccessor(Iterator<T> it, int sizeOrUnknown) {
-            this(it, sizeOrUnknown, 0);
-        }
-
-        public IteratorStreamAccessor(Iterator<T> it, int sizeOrUnknown, int flags) {
-            this.it = it;
-            this.sizeOrUnknown = sizeOrUnknown;
-            this.flags = (sizeOrUnknown >= 0 ? StreamOpFlags.IS_SIZED : 0) |
-                   (flags & (StreamOpFlags.IS_DISTINCT | StreamOpFlags.IS_SORTED | StreamOpFlags.IS_ORDERED));
-        }
-
-        @Override
-        public void forEach(Block<? super T> sink) {
-            while (it.hasNext())
-                sink.apply(it.next());
-        }
-
-        @Override
-        public T next() {
-            return it.next();
-        }
-
-        @Override
-        public boolean hasNext() {
-            return it.hasNext();
-        }
-
-        @Override
-        public Iterator<T> iterator() {
-            return it;
-        }
-
-        @Override
-        public int getStreamFlags() {
-            return flags;
-        }
-
-        @Override
-        public int getSizeIfKnown() {
-            return sizeOrUnknown;
-        }
-    }
-
     private static interface InfiniteIterator<T> extends Iterator<T> {
         @Override
         public boolean hasNext() default {
@@ -323,413 +307,215 @@
         }
     }
 
-    // @@@ Limited to sequential access, thus parallel evaluation cannot occur
-    //     Change later to implement StreamAccessor.spliterator
-    private static class InfiniteIteratorStreamAccessor<T>
-            implements StreamAccessor.ForSequential<T>, InfiniteIterator<T> {
-        private final InfiniteIterator<T> it;
+    private static class ArrayIterator<T> implements Iterator<T> {
+        protected final T[] elements;
+        protected final int endOffset;
+        protected int curOffset;
 
-        public InfiniteIteratorStreamAccessor(InfiniteIterator<T> it) {
-            this.it = it;
+        private ArrayIterator(T[] elements, int startOffset, int len) {
+            this.elements = Objects.requireNonNull(elements);
+            this.endOffset = startOffset + len;
+            this.curOffset = startOffset;
+
+            assert curOffset >= 0 : "offset not positive";
+            assert endOffset >= curOffset : "end lower than start";
+            assert (curOffset < elements.length) || (0 == len && 0 == curOffset) : "offset not in array";
+            assert endOffset <= elements.length : "end not in array";
         }
 
-        @Override
-        public void forEach(Block<? super T> sink) {
-            // Implementing this method would result in a infinite loop
-            // @@@ No mechanism for ops to short circuit push loop
-            // @@@ Spliterator to the rescue? arity of 1, n elements for lhs, rhs is always infinite
-            throw new UnsupportedOperationException("Infinite streams cannot push to a sink");
-        }
+        public T next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
 
-        @Override
-        public T next() {
-            return it.next();
-        }
-
-        @Override
-        public Iterator<T> iterator() {
-            return it;
-        }
-
-        @Override
-        public int getStreamFlags() {
-            // @@@ Should encounter order be set by default?
-            return StreamOpFlags.IS_ORDERED;
-        }
-
-        @Override
-        public int getSizeIfKnown() {
-            return -1;
-        }
-    }
-
-    private static class SpliteratorStreamAccessor<T> implements StreamAccessor<T> {
-        private final Spliterator<T> spliterator;
-        private final int flags;
-
-        public SpliteratorStreamAccessor(Spliterator<T> it) {
-            this(it, 0);
-        }
-
-        public SpliteratorStreamAccessor(Spliterator<T> spliterator, int flags) {
-            this.spliterator = spliterator;
-            this.flags = (spliterator.getSizeIfKnown() >= 0 ? StreamOpFlags.IS_SIZED : 0) |
-                         (flags & (StreamOpFlags.IS_DISTINCT | StreamOpFlags.IS_SORTED | StreamOpFlags.IS_ORDERED));
-        }
-
-        @Override
-        public void forEach(Block<? super T> sink) {
-            spliterator.forEach(sink);
-        }
-
-        @Override
-        public int getStreamFlags() {
-            return flags;
-        }
-
-        @Override
-        public int getSizeIfKnown() {
-            return spliterator.getSizeIfKnown();
-        }
-
-        @Override
-        public Iterator<T> iterator() {
-            return spliterator.iterator();
-        }
-
-        @Override
-        public Spliterator<T> spliterator() {
-            return spliterator;
-        }
-
-        @Override
-        public StreamShape getShape() {
-            return StreamShape.VALUE;
-        }
-
-        @Override
-        public boolean isParallel() {
-            return true;
-        }
-    }
-
-    private static class IterableStreamAccessor<T>
-            implements StreamAccessor.ForSequential<T> {
-        private final Iterable<T> iterable;
-        private final int flags;
-        private final int sizeOrUnknown;
-        Iterator<T> iterator = null;
-
-        IterableStreamAccessor(Iterable<T> iterable) {
-            this(iterable, -1, 0);
-        }
-
-        IterableStreamAccessor(Iterable<T> iterable, int sizeOrUnknown) {
-            this(iterable, sizeOrUnknown, 0);
-        }
-
-        IterableStreamAccessor(Iterable<T> iterable, int sizeOrUnknown, int flags) {
-            this.iterable = iterable;
-            this.flags = (sizeOrUnknown >= 0 ? StreamOpFlags.IS_SIZED : 0) |
-                         (flags & (StreamOpFlags.IS_DISTINCT | StreamOpFlags.IS_SORTED | StreamOpFlags.IS_ORDERED));
-            this.sizeOrUnknown = sizeOrUnknown;
-        }
-
-        public Iterator<T> iterator() {
-            if (iterator == null)
-                iterator = iterable.iterator();
-            return iterator;
-        }
-
-        @Override
-        public void forEach(Block<? super T> sink) {
-            if (iterator == null) {
-                iterable.forEach(sink);
-                iterator = Collections.emptyIterator();
-            }
-            else {
-                while (iterator.hasNext())
-                    sink.apply(iterator.next());
-            }
-        }
-
-        @Override
-        public int getSizeIfKnown() {
-            return sizeOrUnknown;
-        }
-
-        @Override
-        public int getStreamFlags() {
-            return flags;
-        }
-    }
-
-    // @@@ Perhaps replace with SizedIterable when available
-    private static class CollectionStreamAccessor<T> implements StreamAccessor.ForSequential<T> {
-        private final Collection<T> col;
-        private final int flags;
-        Iterator<T> iterator = null;
-
-        CollectionStreamAccessor(Collection<T> col) {
-            this(col, 0);
-        }
-
-        CollectionStreamAccessor(Collection<T> col, int flags) {
-            this.col = col;
-            this.flags = StreamOpFlags.IS_SIZED |
-                  (flags & (StreamOpFlags.IS_DISTINCT | StreamOpFlags.IS_SORTED | StreamOpFlags.IS_ORDERED));
-        }
-
-        public Iterator<T> iterator() {
-            if (iterator == null)
-                iterator = col.iterator();
-            return iterator;
-        }
-
-        @Override
-        public void forEach(Block<? super T> sink) {
-            if (iterator == null) {
-                col.forEach(sink);
-                iterator = Collections.emptyIterator();
-            }
-            else {
-                while (iterator.hasNext())
-                    sink.apply(iterator.next());
-            }
-        }
-
-        @Override
-        public int getStreamFlags() {
-            return flags;
-        }
-
-        @Override
-        public int getSizeIfKnown() {
-            return col.size();
-        }
-    }
-
-    private abstract static class RandomAccessIterator<T> implements Iterator<T> {
-        protected final int endOffset;
-        protected int offset;
-
-        protected RandomAccessIterator(int startOffset, int len) {
-            this.endOffset = startOffset + len;
-            this.offset = startOffset;
-
-            assert offset >= 0 : "offset not positive";
-            assert endOffset >= offset : "end lower than start";
+            return elements[curOffset++];
         }
 
         @Override
         public final boolean hasNext() {
-            return offset < endOffset;
-        }
-
-        @Override
-        public abstract T next();
-    }
-
-    private static class ArrayIterator<T> extends RandomAccessIterator<T> {
-        protected final T[] elements;
-
-        private ArrayIterator(T[] elements, int startOffset, int len) {
-            super(startOffset, len);
-            this.elements = Objects.requireNonNull(elements);
-
-            assert (offset < elements.length) || (0 == len && 0 == offset) : "offset not in array";
-            assert endOffset <= elements.length : "end not in array";
-        }
-
-        @Override
-        public T next() {
-            if(!hasNext()) {
-                throw new NoSuchElementException();
-            }
-
-            return elements[offset++];
+            return curOffset < endOffset;
         }
     }
 
-    private static class ArrayProxyIterator<T> implements Iterator<T> {
-        protected final ArrayProxy proxy;
-        protected int expectedmodcount = -1;
-        protected int offset = -1;
-        protected int endOffset = -1;
+//    private static class ArrayProxyIterator<T> implements Iterator<T> {
+//        protected final ArrayProxy<T> proxy;
+//        protected int expectedmodcount = -1;
+//        protected int offset = -1;
+//        protected int endOffset = -1;
+//
+//        private ArrayProxyIterator(ArrayProxy<T> proxy) {
+//            this.proxy = proxy;
+//        }
+//
+//        protected void comodificationCheck() {
+//            if(expectedmodcount == proxy.getModCount()) {
+//                return;
+//            }
+//
+//            if(-1 != expectedmodcount) {
+//                throw new ConcurrentModificationException();
+//            }
+//
+//            // initalize offset, end offset and leng
+//            expectedmodcount = proxy.getModCount();
+//            offset = proxy.getOffset();
+//            int len = proxy.getLen();
+//            assert len >= 0 : "invalid length";
+//
+//            endOffset = offset + len;
+//            assert (offset >= 0) && (offset <= endOffset) : "offset not in array";
+//            assert (offset <= proxy.getArray().length) && (endOffset <= proxy.getArray().length);
+//        }
+//
+//        @Override
+//        public boolean hasNext() {
+//            comodificationCheck();
+//
+//            return offset < endOffset;
+//        }
+//
+//        @Override
+//        public T next() {
+//            if(!hasNext()) {
+//                throw new NoSuchElementException();
+//            }
+//
+//            return proxy.getArray()[offset++];
+//        }
+//
+//        public void forEach(Block<? super T> sink) {
+//            comodificationCheck();
+//
+//            T[] elements = proxy.getArray();
+//
+//            for (int i=offset; i<endOffset; i++) {
+//                sink.apply(elements[i]);
+//            }
+//            // update only once; reduce heap write traffic
+//            offset = endOffset;
+//
+//            // too late to be useful but may find errors.
+//            comodificationCheck();
+//        }
+//    }
 
-        private ArrayProxyIterator(ArrayProxy<T> proxy) {
-            this.proxy = proxy;
-        }
+//    private static class ArrayProxyStreamAccessor<T>
+//            extends ArrayProxyIterator<T> implements StreamAccessor.ForSequential<T> {
+//
+//        ArrayProxyStreamAccessor(ArrayProxy<T> proxy) {
+//            super(proxy);
+//        }
+//
+//        @Override
+//        public int getStreamFlags() {
+//            return StreamOpFlags.IS_SIZED | StreamOpFlags.IS_ORDERED;
+//        }
+//
+//        @Override
+//        public int getSizeIfKnown() {
+//            comodificationCheck();
+//
+//            return endOffset - offset;
+//        }
+//
+//        @Override
+//        public Iterator<T> iterator() {
+//            return this;
+//        }
+//    }
 
-        protected void comodificationCheck() {
-            if(expectedmodcount == proxy.getModCount()) {
-                return;
-            }
+//    private static class ArrayProxySpliterator<T> extends ArrayProxyStreamAccessor<T> implements Spliterator<T> {
+//        boolean traversing = false;
+//
+//        ArrayProxySpliterator(ArrayProxy<T> proxy) {
+//            super(proxy);
+//        }
+//
+//        /**
+//         * Constructor for sub-splits.
+//         *
+//         * @param proxy
+//         * @param offset offset of sub-split.
+//         * @param len
+//         */
+//        ArrayProxySpliterator(ArrayProxy<T> proxy, int offset, int len) {
+//            super(proxy);
+//            this.offset = offset;
+//            this.endOffset = offset + len;
+//            this.expectedmodcount = proxy.getModCount();
+//        }
+//
+//        @Override
+//        public int getSizeIfKnown() {
+//            comodificationCheck();
+//
+//            return endOffset - offset;
+//        }
+//
+//        @Override
+//        public int estimateSize() {
+//            return getSizeIfKnown();
+//        }
+//
+//        @Override
+//        public boolean isPredictableSplits() {
+//            return true;
+//        }
+//
+//        @Override
+//        public int getNaturalSplits() {
+//            comodificationCheck();
+//
+//            return (endOffset - offset > 1) && !traversing ? 1 : 0;
+//        }
+//
+//        @Override
+//        public Spliterator<T> split() {
+//            comodificationCheck();
+//
+//            if (traversing) {
+//                throw new IllegalStateException("split after starting traversal");
+//            }
+//
+//            int t = (endOffset - offset) / 2;
+//            ArrayProxySpliterator<T> ret = new ArrayProxySpliterator<>(proxy, offset, t);
+//            if(ret.expectedmodcount != expectedmodcount) {
+//                throw new ConcurrentModificationException();
+//            }
+//            offset += t;
+//            return ret;
+//        }
+//
+//        @Override
+//        public Iterator<T> iterator() {
+//            traversing = true;
+//            return this;
+//        }
+//
+//        @Override
+//        public void forEach(Block<? super T> block) {
+//           traversing = true;
+//           super.forEach(block);
+//        }
+//    }
 
-            if(-1 != expectedmodcount) {
-                throw new ConcurrentModificationException();
-            }
-
-            // initalize offset, end offset and leng
-            expectedmodcount = proxy.getModCount();
-            offset = proxy.getOffset();
-            int len = proxy.getLen();
-            assert len >= 0 : "invalid length";
-
-            endOffset = offset + len;
-            assert (offset >= 0) && (offset <= endOffset) : "offset not in array";
-            assert (offset <= proxy.getArray().length) && (endOffset <= proxy.getArray().length);
-        }
-
-        @Override
-        public boolean hasNext() {
-            comodificationCheck();
-
-            return offset < endOffset;
-        }
-
-        @Override
-        public T next() {
-            if(!hasNext()) {
-                throw new NoSuchElementException();
-            }
-
-            return ((T[]) proxy.getArray())[offset++];
-        }
-
-        public void forEach(Block<? super T> sink) {
-            comodificationCheck();
-
-            T[] elements = (T[]) proxy.getArray();
-
-            for (int i=offset; i<endOffset; i++) {
-                sink.apply(elements[i]);
-            }
-            // update only once; reduce heap write traffic
-            offset = endOffset;
-
-            // too late to be useful but may find errors.
-            comodificationCheck();
-        }
-    }
-
-    private static class ArrayProxyStreamAccessor<T>
-            extends ArrayProxyIterator<T> implements StreamAccessor.ForSequential<T> {
-
-        ArrayProxyStreamAccessor(ArrayProxy proxy) {
-            super(proxy);
-        }
-
-        @Override
-        public int getStreamFlags() {
-            return StreamOpFlags.IS_SIZED | StreamOpFlags.IS_ORDERED;
-        }
-
-        @Override
-        public int getSizeIfKnown() {
-            comodificationCheck();
-
-            return endOffset - offset;
-        }
-
-        @Override
-        public Iterator<T> iterator() {
-            return this;
-        }
-    }
-
-    private static class ArrayProxySpliterator<T> extends ArrayProxyStreamAccessor<T> implements Spliterator<T> {
-        boolean traversing = false;
-
-        ArrayProxySpliterator(ArrayProxy proxy) {
-            super(proxy);
-        }
-
-        /**
-         * Constructor for sub-splits.
-         *
-         * @param proxy
-         * @param offset offset of sub-split.
-         * @param len
-         */
-        ArrayProxySpliterator(ArrayProxy proxy, int offset, int len) {
-            super(proxy);
-            this.offset = offset;
-            this.endOffset = offset + len;
-            this.expectedmodcount = proxy.getModCount();
-        }
-
-        @Override
-        public int getSizeIfKnown() {
-            comodificationCheck();
-
-            return endOffset - offset;
-        }
-
-        @Override
-        public int estimateSize() {
-            return getSizeIfKnown();
-        }
-
-        @Override
-        public boolean isPredictableSplits() {
-            return true;
-        }
-
-        @Override
-        public int getNaturalSplits() {
-            comodificationCheck();
-
-            return (endOffset - offset > 1) && !traversing ? 1 : 0;
-        }
-
-        @Override
-        public Spliterator<T> split() {
-            comodificationCheck();
-
-            if (traversing) {
-                throw new IllegalStateException("split after starting traversal");
-            }
-
-            int t = (endOffset - offset) / 2;
-            ArrayProxySpliterator<T> ret = new ArrayProxySpliterator<>(proxy, offset, t);
-            if(ret.expectedmodcount != expectedmodcount) {
-                throw new ConcurrentModificationException();
-            }
-            offset += t;
-            return ret;
-        }
-
-        @Override
-        public Iterator<T> iterator() {
-            traversing = true;
-            return this;
-        }
-
-        @Override
-        public void forEach(Block<? super T> sink) {
-           traversing = true;
-           super.forEach(sink);
-        }
-    }
-
-    private static class ArrayProxyParallelStreamAccessor<T> extends ArrayProxySpliterator<T> {
-
-        ArrayProxyParallelStreamAccessor(ArrayProxy proxy) {
-            super(proxy);
-        }
-
-        @Override
-        public boolean isParallel() {
-            return true;
-        }
-
-        @Override
-        public Spliterator<T> spliterator() {
-            comodificationCheck();
-            return this;
-        }
-    }
+//    private static class ArrayProxyParallelStreamAccessor<T> extends ArrayProxySpliterator<T> {
+//
+//        ArrayProxyParallelStreamAccessor(ArrayProxy<T> proxy) {
+//            super(proxy);
+//        }
+//
+//        @Override
+//        public boolean isParallel() {
+//            return true;
+//        }
+//
+//        @Override
+//        public Spliterator<T> spliterator() {
+//            comodificationCheck();
+//            return this;
+//        }
+//    }
 
     private static class ArraySpliterator<T> extends ArrayIterator<T> implements Spliterator<T> {
         boolean traversing = false;
@@ -744,7 +530,7 @@
 
         @Override
         public int getSizeIfKnown() {
-            return endOffset - offset;
+            return endOffset - curOffset;
         }
 
         @Override
@@ -758,18 +544,18 @@
         }
 
         @Override
-        public void forEach(Block<? super T> sink) {
+        public void forEach(Block<? super T> block) {
             traversing = true;
-            for (int i=offset; i<endOffset; i++) {
-                sink.apply(elements[i]);
+            for (int i= curOffset; i<endOffset; i++) {
+                block.apply(elements[i]);
             }
             // update only once; reduce heap write traffic
-            offset = endOffset;
+            curOffset = endOffset;
         }
 
         @Override
         public int getNaturalSplits() {
-            return (endOffset - offset > 1) ? 1 : 0;
+            return (endOffset - curOffset > 1) ? 1 : 0;
         }
 
         @Override
@@ -777,9 +563,9 @@
             if (traversing) {
                 throw new IllegalStateException("split after starting traversal");
             }
-            int t = (endOffset - offset) / 2;
-            Spliterator<T> ret = new ArraySpliterator<>(elements, offset, t);
-            offset += t;
+            int t = (endOffset - curOffset) / 2;
+            Spliterator<T> ret = new ArraySpliterator<>(elements, curOffset, t);
+            curOffset += t;
             return ret;
         }
 
@@ -789,78 +575,4 @@
             return this;
         }
     }
-
-    private static class ArrayStreamAccessor<T>
-            extends ArrayIterator<T> implements StreamAccessor<T> {
-
-        ArrayStreamAccessor(T[] elements) {
-            this(elements, 0, elements.length);
-        }
-
-        ArrayStreamAccessor(T[] elements, int offset, int length) {
-            super(elements, offset, length);
-        }
-
-        @Override
-        public int getStreamFlags() {
-            return StreamOpFlags.IS_SIZED | StreamOpFlags.IS_ORDERED;
-        }
-
-        @Override
-        public void forEach(Block<? super T> sink) {
-            for (int i=offset; i<endOffset; i++) {
-                sink.apply(elements[i]);
-            }
-            // update only once; reduce heap write traffic
-            offset = endOffset;
-        }
-
-        @Override
-        public boolean isParallel() {
-            return false;
-        }
-
-        @Override
-        public Iterator<T> iterator() {
-            return this;
-        }
-
-        @Override
-        public Spliterator<T> spliterator() {
-            throw new UnsupportedOperationException();
-        }
-    }
-
-    private static class ArrayParallelStreamAccessor<T>
-            extends ArraySpliterator<T> implements StreamAccessor<T> {
-
-        ArrayParallelStreamAccessor(T[] elements) {
-            this(elements, 0, elements.length);
-        }
-
-        ArrayParallelStreamAccessor(T[] elements, int offset, int length) {
-            super(elements, offset, length);
-        }
-
-        @Override
-        public int getStreamFlags() {
-            return StreamOpFlags.IS_SIZED | StreamOpFlags.IS_ORDERED;
-        }
-
-        @Override
-        public boolean isParallel() {
-            return true;
-        }
-
-        @Override
-        public Iterator<T> iterator() {
-            return this;
-        }
-
-        @Override
-        public Spliterator<T> spliterator() {
-            return this;
-        }
-    }
-
 }
--- a/src/share/classes/java/util/streams/ValuePipeline.java	Fri Nov 02 11:36:01 2012 +0100
+++ b/src/share/classes/java/util/streams/ValuePipeline.java	Sat Nov 03 20:12:39 2012 +0100
@@ -41,9 +41,8 @@
  */
 public class ValuePipeline<T, U> extends AbstractPipeline<T,U> implements Stream<U>  {
 
-    public<S> ValuePipeline(StreamAccessor<S> source) {
-        super(source);
-        assert source.getShape() == StreamShape.VALUE;
+    public<S> ValuePipeline(Spliterator<S> spliterator, int sourceFlags) {
+        super(spliterator, sourceFlags, StreamShape.VALUE);
     }
 
     public ValuePipeline(AbstractPipeline<?, T> upstream, IntermediateOp<T, U> op) {
--- a/src/share/classes/java/util/streams/ops/CollectorOps.java	Fri Nov 02 11:36:01 2012 +0100
+++ b/src/share/classes/java/util/streams/ops/CollectorOps.java	Sat Nov 03 20:12:39 2012 +0100
@@ -28,6 +28,7 @@
 import java.util.streams.ParallelPipelineHelper;
 import java.util.streams.PipelineHelper;
 import java.util.streams.Sink;
+import java.util.streams.StreamOpFlags;
 
 public final class CollectorOps {
     private CollectorOps() { }
@@ -53,37 +54,7 @@
         return (Terminal<E_IN>) TERMINAL_COLLECTOR_OP;
     }
 
-    /**
-     * Collect elements into a Node, if evaluated in parallel, and force sequential evaluation on upstream operations,
-     * otherwise, if evaluated sequentially, this operation is a no-op.
-     */
-    public static final class Sequential<E_IN> implements StatefulOp<E_IN, E_IN> {
-
-        private Sequential() { }
-
-        @Override
-        public Iterator<E_IN> wrapIterator(int flags, Iterator<E_IN> in) {
-            return in;
-        }
-
-        @Override
-        public Sink<E_IN> wrapSink(int flags, Sink<E_IN> sink) {
-            return sink;
-        }
-
-        @Override
-        public <P_IN> Node<E_IN> evaluateSequential(PipelineHelper<P_IN, E_IN> helper) {
-            return helper.collectOutput();
-        }
-
-        @Override
-        public <P_IN> Node<E_IN> evaluateParallel(ParallelPipelineHelper<P_IN, E_IN> helper) {
-            // Force sequential evaluation on upstream operations
-            return Nodes.withSequentialStreamAccessor(helper.collectOutput());
-        }
-    }
-
-    public static final class Parallel<E_IN> implements StatefulOp<E_IN, E_IN> {
+    public static class Parallel<E_IN> implements StatefulOp<E_IN, E_IN> {
 
         private Parallel() { }
 
@@ -109,6 +80,20 @@
     }
 
     /**
+     * Collect elements into a Node, if evaluated in parallel, and force sequential evaluation on upstream operations,
+     * otherwise, if evaluated sequentially, this operation is a no-op.
+     */
+    public static class Sequential<E_IN> extends Parallel<E_IN> {
+
+        private Sequential() { }
+
+        @Override
+        public int getOpFlags() {
+            return StreamOpFlags.NOT_PARALLEL;
+        }
+    }
+
+    /**
      * Collect elements into a Node that is the result of terminal evaluation.
      */
     public static final class Terminal<E_IN> implements TerminalOp<E_IN, Node<E_IN>> {
--- a/src/share/classes/java/util/streams/ops/Node.java	Fri Nov 02 11:36:01 2012 +0100
+++ b/src/share/classes/java/util/streams/ops/Node.java	Sat Nov 03 20:12:39 2012 +0100
@@ -26,7 +26,6 @@
 
 import java.util.*;
 import java.util.streams.Spliterator;
-import java.util.streams.StreamAccessor;
 
 public interface Node<T> extends Iterable<T>, Sized {
 
@@ -82,13 +81,4 @@
      */
     void copyInto(T[] array, int offset) throws IndexOutOfBoundsException;
 
-    /**
-     * View this node as a stream accessor.
-     *
-     * @param flags the stream accessor flags.
-     * @return the stream accessor.
-     */
-    StreamAccessor<T> asStreamAccessor(int flags) default {
-        return Nodes.toStreamAccessor(this, flags);
-    }
 }
--- a/src/share/classes/java/util/streams/ops/Nodes.java	Fri Nov 02 11:36:01 2012 +0100
+++ b/src/share/classes/java/util/streams/ops/Nodes.java	Sat Nov 03 20:12:39 2012 +0100
@@ -30,147 +30,6 @@
 import java.util.streams.*;
 
 public class Nodes {
-    public static interface NodeStreamAccessor<T> extends StreamAccessor<T> {
-        Node<T> asNode();
-    }
-
-    /**
-     * Adapt the node to be a node with a sequential (non-parallel) stream accessor.
-     *
-     * @param node the node to adapt.
-     * @return a node with a sequential stream accessor.
-     */
-    public static <T> Node<T> withSequentialStreamAccessor(Node<T> node) {
-        return (node instanceof NodeSequentialStreamAccessor) ? node : new NodeSequentialStreamAccessor<>(node);
-    }
-
-    private static class NodeSequentialStreamAccessor<T> implements Node<T> {
-        private final Node<T> node;
-
-        public NodeSequentialStreamAccessor(Node<T> node) {this.node = node;}
-
-        @Override
-        public Spliterator<T> spliterator() {
-            return node.spliterator();
-        }
-
-        @Override
-        public int getChildCount() {
-            return node.getChildCount();
-        }
-
-        @Override
-        public Iterator<Node<T>> children() {
-            return node.children();
-        }
-
-        @Override
-        public Node flatten() {
-            return node.flatten();
-        }
-
-        @Override
-        public T[] asArray() {
-            return node.asArray();
-        }
-
-        @Override
-        public void copyInto(T[] array, int offset) throws IndexOutOfBoundsException {
-            node.copyInto(array, offset);
-        }
-
-        @Override
-        public StreamAccessor<T> asStreamAccessor(int flags) {
-            return toStreamAccessor(node, flags, false);
-        }
-
-        @Override
-        public Iterator<T> iterator() {
-            return node.iterator();
-        }
-
-        @Override
-        public int size() {
-            return node.size();
-        }
-
-        @Override
-        public boolean isEmpty() {
-            return node.isEmpty();
-        }
-
-        @Override
-        public void forEach(Block<? super T> block) {
-            node.forEach(block);
-        }
-    }
-
-    static <T> NodeStreamAccessor<T> toStreamAccessor(Node<T> node, int flags) {
-        return new NodeStreamAccessorImpl<>(node, flags);
-    }
-
-    static <T> NodeStreamAccessor<T> toStreamAccessor(Node<T> node, int flags, boolean isParallel) {
-        return new NodeStreamAccessorImpl<>(node, flags, isParallel);
-    }
-
-    // @@@ This is a copy of Streams.SpliteratorStreamAccessor
-    private static class NodeStreamAccessorImpl<T> implements NodeStreamAccessor<T> {
-        private final Node<T> node;
-        private final Spliterator<T> spliterator;
-        private final int sizeOrUnknown;
-        private final int flags;
-        private final boolean isParallel;
-
-        NodeStreamAccessorImpl(Node<T> node, int flags) {
-            this(node, flags, true);
-        }
-
-        NodeStreamAccessorImpl(Node<T> node, int flags, boolean isParallel) {
-            this.node = node;
-            this.spliterator = node.spliterator();
-            this.sizeOrUnknown = node.size();
-            // @@@ Injecting order may depend on if order has been explicitly cleared upstream
-            this.flags = StreamOpFlags.IS_ORDERED | StreamOpFlags.IS_SIZED | flags;
-            this.isParallel = isParallel;
-        }
-
-        @Override
-        public Node<T> asNode() {
-            return node;
-        }
-
-        // StreamAccessor
-
-        @Override
-        public void forEach(Block<? super T> sink) {
-            spliterator.forEach(sink);
-        }
-
-        @Override
-        public int getStreamFlags() {
-            return flags;
-        }
-
-        @Override
-        public int getSizeIfKnown() {
-            return sizeOrUnknown;
-        }
-
-        @Override
-        public Iterator<T> iterator() {
-            return spliterator.iterator();
-        }
-
-        @Override
-        public Spliterator<T> spliterator() {
-            return spliterator;
-        }
-
-        @Override
-        public boolean isParallel() {
-            return isParallel;
-        }
-    }
 
     public static<T> Node<T> node(final T[] array) {
         return new ArrayNode<>(array);
@@ -448,7 +307,11 @@
 
         @Override
         public String toString() {
-            return String.format("ConcNode[%s]", Arrays.stream(nodes).map(n -> n.toString()).into(new StringJoiner(",")).toString());
+            if (size() < 32) {
+                return String.format("ConcNode[%s]", Arrays.stream(nodes).map(n -> n.toString()).into(new StringJoiner(",")).toString());
+            } else {
+                return String.format("ConcNode[size=%d]", size());
+            }
         }
 
         private static class ConcNodeSpliterator<T> implements Spliterator<T> {
@@ -498,14 +361,14 @@
             }
 
             @Override
-            public void forEach(Block<? super T> sink) {
+            public void forEach(Block<? super T> block) {
                 if (iterator == null) {
-                    cur.forEach(sink);
+                    cur.forEach(block);
                     iterator = Collections.emptyIterator();
                 }
                 else {
                     while (iterator.hasNext())
-                        sink.apply(iterator.next());
+                        block.apply(iterator.next());
                 }
             }
 
@@ -795,7 +658,7 @@
 
         @Override
         public Stream<T> stream() {
-            return Streams.stream(this, StreamOpFlags.IS_ORDERED);
+            return Streams.stream(this, StreamOpFlags.IS_SIZED | StreamOpFlags.IS_ORDERED);
         }
 
         @Override
@@ -822,62 +685,146 @@
         // </editor-fold>
 
         protected Spliterator<T> spliterator() {
-            return new Spliterator<T>() {
-                Iterator<T> iterator = null;
-                int spineStart = 0;
-                final Spliterator<T> finalChunkSpliterator = (0 != chunkIndex)
-                        ? Streams.spliterator(chunk, 0, chunkIndex)
-                        : Streams.emptySpliterator();
+            return new SpinedListSpliterator();
+        }
 
-                @Override
-                public int getSizeIfKnown() {
+        class SpinedListSpliterator implements Spliterator<T>, Iterator<T>  {
+            Iterator<T> iterator = null;
+
+            boolean traversing = false;
+
+            boolean isSpinedSpliterator = true;
+
+            int spineOffset = 0;
+
+            T[] elements;
+
+            int offset;
+
+            int endOffset;
+
+            SpinedListSpliterator() {
+                if (spineOffset == spineIndex) {
+                    isSpinedSpliterator = false;
+                    elements = chunk;
+                    offset = 0;
+                    endOffset = chunkIndex;
+                }
+            }
+
+            private SpinedListSpliterator(T[] content, int offset, int endOffset) {
+                this.spineOffset = spineIndex;
+                this.elements = content;
+                this.offset = offset;
+                this.endOffset = endOffset;
+            }
+
+            @Override
+            public int getSizeIfKnown() {
+                if (isSpinedSpliterator) {
+                    // @@@ This will not return the correct known size if iterated on
                     int result = SpinedList.this.size();
-                    if(spineStart > 0) {
-                        result -= SpinedList.totalSizeForSpineIndex(spineStart - 1);
+                    if (spineOffset > 0) {
+                        result -= SpinedList.totalSizeForSpineIndex(spineOffset - 1);
                     }
 
                     return result;
+                } else {
+                    return endOffset - offset;
                 }
+            }
 
-                @Override
-                public Iterator<T> iterator() {
-                    if (null == iterator) {
-                        if(spineStart < spineIndex) {
-                            iterator = SpinedList.this.iterator(spineStart);
-                        } else {
-                            iterator = finalChunkSpliterator.iterator();
-                        }
-                    }
+            @Override
+            public Iterator<T> iterator() {
+                traversing = true;
 
+                if (iterator != null) {
                     return iterator;
                 }
 
-                @Override
-                public boolean isPredictableSplits() {
-                    return true;
+                if (isSpinedSpliterator) {
+                    return iterator = SpinedList.this.iterator(spineOffset);
+                } else {
+                    return iterator = this;
+                }
+            }
+
+            @Override
+            public void forEach(Block<? super T> block) {
+                traversing = true;
+
+                if (isSpinedSpliterator) {
+                    Iterator<T> remaining = iterator();
+                    while (remaining.hasNext()) {
+                        block.apply(remaining.next());
+                    }
+                } else {
+                    for (int i = offset; i < endOffset; i++) {
+                        block.apply(elements[i]);
+                    }
+                    // update only once; reduce heap write traffic
+                    offset = endOffset;
+                }
+            }
+
+            @Override
+            public T next() {
+                if (!hasNext()) {
+                    throw new NoSuchElementException();
                 }
 
-                @Override
-                public int getNaturalSplits() {
-                    return (null == iterator)
-                            ? spineIndex - spineStart + finalChunkSpliterator.getNaturalSplits()
-                            : 0;
+                return elements[offset++];
+            }
+
+            @Override
+            public final boolean hasNext() {
+                return offset < endOffset;
+            }
+
+            @Override
+            public boolean isPredictableSplits() {
+                return true;
+            }
+
+            @Override
+            public int getNaturalSplits() {
+                if (traversing) {
+                    return 0;
                 }
 
-                @Override
-                public Spliterator<T> split() {
-                    if (null != iterator) {
-                        throw new IllegalStateException("split after starting traversal");
+                if (isSpinedSpliterator) {
+                    return spineIndex - spineOffset + (chunkIndex > 1 ? 1 : 0);
+                } else {
+                    return (endOffset - offset > 1) ? 1 : 0;
+                }
+            }
+
+            @Override
+            public Spliterator<T> split() {
+                if (traversing) {
+                    throw new IllegalStateException("split after starting traversal");
+                }
+
+                if (isSpinedSpliterator) {
+                    Spliterator<T> ret = Arrays.spliterator(spine[spineOffset++]);
+
+                    if (spineOffset == spineIndex) {
+                        isSpinedSpliterator = false;
+                        elements = chunk;
+                        offset = 0;
+                        endOffset = chunkIndex;
                     }
-                    if (spineStart < spineIndex) {
-                        return Arrays.spliterator(spine[spineStart++]);
-                    } else {
-                        return finalChunkSpliterator.split();
-                    }
+
+                    return ret;
+                } else {
+                    int mid = (endOffset - offset) / 2;
+                    Spliterator<T> ret = Arrays.spliterator(elements, offset, mid);
+                    offset += mid;
+                    return ret;
                 }
-            };
+            }
         }
-   }
+    }
 
     private static class SpinedNodeBuilder<T> extends SpinedList<T> implements Node<T>, NodeBuilder<T> {
 
--- a/src/share/classes/java/util/streams/ops/SortedOp.java	Fri Nov 02 11:36:01 2012 +0100
+++ b/src/share/classes/java/util/streams/ops/SortedOp.java	Sat Nov 03 20:12:39 2012 +0100
@@ -25,6 +25,7 @@
 package java.util.streams.ops;
 
 import java.util.*;
+import java.util.streams.PipelineHelper;
 import java.util.streams.Sink;
 import java.util.streams.StreamOpFlags;
 
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/FlagOpTest.java	Fri Nov 02 11:36:01 2012 +0100
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/FlagOpTest.java	Sat Nov 03 20:12:39 2012 +0100
@@ -126,7 +126,6 @@
         FlagDeclaringOp[] opsArray = ops.toArray(new FlagDeclaringOp[ops.size()]);
 
         testUsingData(data).without(IntermediateOpTest.PAR_STREAM_TO_ARRAY_CLEAR_SIZED).excerciseOps(opsArray);
-//        exerciseOps(data, opsArray);
     }
 
     public void testFlagsSetAllClear() {
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/NodeTest.java	Fri Nov 02 11:36:01 2012 +0100
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/NodeTest.java	Sat Nov 03 20:12:39 2012 +0100
@@ -27,10 +27,7 @@
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
+import java.util.*;
 import java.util.functions.Mapper;
 import java.util.streams.Spliterator;
 import java.util.streams.Stream;
@@ -41,35 +38,40 @@
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
 
 @Test
 public class NodeTest {
 
     @DataProvider(name = "nodes")
     public Object[][] createSizes() {
-        Integer[] array = new Integer[100];
-        for (int i = 0; i < array.length; i++) {
-            array[i] = i;
+        List<Object[]> params = new ArrayList<>();
+
+        for (int size : Arrays.asList(0, 1, 4, 15, 16, 17, 127, 128, 129, 1000)) {
+            Integer[] array = new Integer[size];
+            for (int i = 0; i < array.length; i++) {
+                array[i] = i;
+            }
+
+            List<Node<Integer>> nodes = new ArrayList<>();
+            nodes.add(Nodes.node(array));
+            nodes.add(Nodes.node(Arrays.asList(array)));
+            nodes.add(Nodes.node((Streamable<Stream<Integer>>) Arrays.asList(array)));
+            nodes.add(Nodes.node(Arrays.asList(array).stream()));
+            nodes.add(Nodes.node(Arrays.asList(array).parallel()));
+            nodes.add(degenerateTree(Arrays.asList(array).iterator()));
+            nodes.add(tree(Arrays.asList(array), l -> Nodes.node(l.toArray(new Integer[l.size()]))));
+            nodes.add(tree(Arrays.asList(array), l -> Nodes.node(l)));
+            nodes.add(fill(array, Nodes.<Integer>makeBuilder(array.length)));
+            nodes.add(fill(array, Nodes.<Integer>makeVariableSizeBuilder()));
+
+            for (int i = 0; i < nodes.size(); i++) {
+                params.add(new Object[]{array, nodes.get(i)});
+            }
+
         }
 
-        List<Node<Integer>> nodes = new ArrayList<>();
-        nodes.add(Nodes.node(array));
-        nodes.add(Nodes.node(Arrays.asList(array)));
-        nodes.add(Nodes.node((Streamable<Stream<Integer>>) Arrays.asList(array)));
-        nodes.add(Nodes.node(Arrays.asList(array).stream()));
-        nodes.add(Nodes.node(Arrays.asList(array).parallel()));
-        nodes.add(degenerateTree(Arrays.asList(array).iterator()));
-        nodes.add(tree(Arrays.asList(array), l -> Nodes.node(l.toArray(new Integer[l.size()]))));
-        nodes.add(tree(Arrays.asList(array), l -> Nodes.node(l)));
-        nodes.add(fill(array, Nodes.<Integer>makeBuilder(array.length)));
-        nodes.add(fill(array, Nodes.<Integer>makeVariableSizeBuilder()));
-
-        Object[][] params = new Object[nodes.size()][];
-        for (int i = 0; i < nodes.size(); i++) {
-            params[i] = new Object[]{array, nodes.get(i)};
-        }
-
-        return params;
+        return params.toArray(new Object[0][]);
     }
 
     Node<Integer> fill(Integer[] array, NodeBuilder<Integer> nb) {
@@ -82,6 +84,10 @@
     }
 
     Node<Integer> degenerateTree(Iterator<Integer> it) {
+        if (!it.hasNext()) {
+            return Nodes.node(Collections.<Integer>emptyList());
+        }
+
         Integer i = it.next();
         if (it.hasNext()) {
             return Nodes.node(Nodes.node(new Integer[]{i}), degenerateTree(it));
@@ -140,6 +146,57 @@
     }
 
     @Test(dataProvider = "nodes")
+    public void testRootSpliterator(Integer[] array, Node<Integer> n) {
+        List<Integer> l = new ArrayList<>(n.size());
+        Iterator<Integer> it = n.spliterator().iterator();
+        while (it.hasNext()) {
+            l.add(it.next());
+        }
+
+        assertEquals(l.toArray(), array);
+    }
+
+    @Test(dataProvider = "nodes")
+    public void testDepthOneSpliterator(Integer[] array, Node<Integer> n) {
+        List<Integer> l = new ArrayList<>(n.size());
+
+        Spliterator<Integer> s = n.spliterator();
+        for (int i = 0; i < s.getNaturalSplits(); i++) {
+            Iterator<Integer> it = s.split().iterator();
+            while (it.hasNext()) {
+                l.add(it.next());
+            }
+        }
+
+        Iterator<Integer> it = s.iterator();
+        while (it.hasNext()) {
+            l.add(it.next());
+        }
+
+        assertEquals(l.toArray(), array);
+    }
+
+    @Test(dataProvider = "nodes")
+    public void testTwoSpliterator(Integer[] array, Node<Integer> n) {
+        List<Integer> l = new ArrayList<>(n.size());
+
+        Spliterator<Integer> s2 = n.spliterator();
+        Spliterator<Integer> s1 = s2.split();
+
+        Iterator<Integer> it = s1.iterator();
+        while (it.hasNext()) {
+            l.add(it.next());
+        }
+
+        it = s2.iterator();
+        while (it.hasNext()) {
+            l.add(it.next());
+        }
+
+        assertEquals(l.toArray(), array);
+    }
+
+    @Test(dataProvider = "nodes")
     public void testSpliterator(Integer[] array, Node<Integer> n) {
         List<Integer> l = new ArrayList<>(n.size());
         split(l, n.spliterator());
@@ -158,9 +215,14 @@
             });
         }
         else {
+            int size = s.getSizeIfKnown();
             for (int i = 0; i < s.getNaturalSplits(); i++) {
-                split(l, s.split());
+                Spliterator<Integer> _s = s.split();
+                split(l, _s);
             }
+
+            assertTrue(s.getSizeIfKnown() < size);
+
             split(l, s);
         }
     }
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/StreamOpTestCase.java	Fri Nov 02 11:36:01 2012 +0100
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/StreamOpTestCase.java	Sat Nov 03 20:12:39 2012 +0100
@@ -681,7 +681,8 @@
         @Override
         @SuppressWarnings({ "rawtypes", "unchecked" })
         public AbstractPipeline<?, T> seq() {
-            return (AbstractPipeline<?, T>) Streams.stream(collection, -1, StreamOpFlags.IS_ORDERED);
+            Iterable<T> asIterable = () -> collection.iterator();
+            return (AbstractPipeline<?, T>) Streams.stream(asIterable, StreamOpFlags.IS_ORDERED);
         }
     }
 }
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/ToArrayOpTest.java	Fri Nov 02 11:36:01 2012 +0100
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/ToArrayOpTest.java	Sat Nov 03 20:12:39 2012 +0100
@@ -30,6 +30,8 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.streams.StreamOpFlags;
+import java.util.streams.Streams;
 import java.util.streams.ValuePipeline;
 import java.util.streams.ops.*;
 
@@ -100,19 +102,19 @@
 
         {
             Node<Integer> node = Nodes.node(l);
-            Object[] output = new ValuePipeline<>(node.asStreamAccessor(0)).toArray();
+            Object[] output = Streams.stream(node, StreamOpFlags.IS_SIZED | StreamOpFlags.IS_ORDERED).toArray();
             assertEquals(Arrays.asList(output), l);
         }
 
         {
             Node<Integer> node = Nodes.node(l.toArray(new Integer[l.size()]));
-            Object[] output = new ValuePipeline<>(node.asStreamAccessor(0)).toArray();
+            Object[] output = Streams.stream(node, StreamOpFlags.IS_SIZED | StreamOpFlags.IS_ORDERED).toArray();
             assertEquals(Arrays.asList(output), l);
         }
 
         {
             Node<Integer> node = tree(l);
-            Object[] output = new ValuePipeline<>(node.asStreamAccessor(0)).toArray();
+            Object[] output = Streams.stream(node, StreamOpFlags.IS_SIZED | StreamOpFlags.IS_ORDERED).toArray();
             assertEquals(Arrays.asList(output), l);
         }
 
@@ -121,7 +123,7 @@
             for (Integer i : l) {
                 nodeBuilder.accept(i);
             }
-            Object[] output = new ValuePipeline<>(nodeBuilder.build().asStreamAccessor(0)).toArray();
+            Object[] output = Streams.stream(nodeBuilder, StreamOpFlags.IS_SIZED | StreamOpFlags.IS_ORDERED).toArray();
             assertEquals(Arrays.asList(output), l);
         }
 
@@ -132,13 +134,13 @@
                 nodeBuilder.accept(i);
             }
             nodeBuilder.end();
-            Object[] output = new ValuePipeline<>(nodeBuilder.build().asStreamAccessor(0)).toArray();
+            Object[] output = Streams.stream(nodeBuilder, StreamOpFlags.IS_SIZED | StreamOpFlags.IS_ORDERED).toArray();
             assertEquals(Arrays.asList(output), l);
         }
 
         {
             Node<Integer> node = Nodes.node(l);
-            Object[] output = new ValuePipeline<>(node.asStreamAccessor(0)).sequential().toArray();
+            Object[] output = Streams.stream(node, StreamOpFlags.IS_SIZED | StreamOpFlags.IS_ORDERED).sequential().toArray();
             assertEquals(Arrays.asList(output), l);
         }