changeset 6333:80332caeac10

Refactor of StreamBuilder/Node/NodeBuilder Contributed-by: paul.sandoz@oracle.com
author briangoetz
date Sat, 20 Oct 2012 13:44:10 -0400
parents af4b2dd992d6
children a558348bb5f9
files src/share/classes/java/util/streams/AbstractPipeline.java src/share/classes/java/util/streams/ParallelPipelineHelper.java src/share/classes/java/util/streams/PipelineHelper.java src/share/classes/java/util/streams/Streams.java src/share/classes/java/util/streams/ValuePipeline.java src/share/classes/java/util/streams/ops/AbstractTask.java src/share/classes/java/util/streams/ops/CollectorOp.java src/share/classes/java/util/streams/ops/ConcatOp.java src/share/classes/java/util/streams/ops/CumulateOp.java src/share/classes/java/util/streams/ops/GroupByOp.java src/share/classes/java/util/streams/ops/LimitOp.java src/share/classes/java/util/streams/ops/Node.java src/share/classes/java/util/streams/ops/NodeBuilder.java src/share/classes/java/util/streams/ops/Nodes.java src/share/classes/java/util/streams/ops/StatefulOp.java src/share/classes/java/util/streams/ops/ToArrayOp.java src/share/classes/java/util/streams/ops/TreeUtils.java src/share/classes/java/util/streams/ops/UniqOp.java test-ng/tests/org/openjdk/tests/java/util/streams/ops/ConcatOpTest.java test-ng/tests/org/openjdk/tests/java/util/streams/ops/CumulateOpTest.java test-ng/tests/org/openjdk/tests/java/util/streams/ops/GroupByOpTest.java test-ng/tests/org/openjdk/tests/java/util/streams/ops/NodeBuilderTest.java test-ng/tests/org/openjdk/tests/java/util/streams/ops/NodeTest.java test-ng/tests/org/openjdk/tests/java/util/streams/ops/ToArrayOpTest.java test-ng/tests/org/openjdk/tests/java/util/streams/ops/UniqOpTest.java
diffstat 25 files changed, 1098 insertions(+), 404 deletions(-) [+]
line wrap: on
line diff
--- a/src/share/classes/java/util/streams/AbstractPipeline.java	Fri Oct 19 17:27:49 2012 -0400
+++ b/src/share/classes/java/util/streams/AbstractPipeline.java	Sat Oct 20 13:44:10 2012 -0400
@@ -107,10 +107,10 @@
             if (upToOp < ops.length) {
                 StatefulOp op = (StatefulOp) ops[upToOp];
                 Node<?> intermediateResult = evaluateParallel(accessor, ops, fromOp, upToOp, op);
+
                 // @@@ Inherit other flags from pipeline e.g. the intermediate result may be sorted and/or distinct,
                 //     and is ordered
-                accessor = new Streams.SpliteratorStreamAccessor<>(intermediateResult.spliterator(),
-                                                                   intermediateResult.size(), StreamOpFlags.IS_ORDERED);
+                accessor = Nodes.toStreamAccessor(intermediateResult);
 
                 fromOp = ++upToOp;
             }
@@ -161,19 +161,35 @@
             this.flags = StreamOpFlags.combineStreamFlags(source.getStreamFlags(), opsFlags);
         }
 
+        boolean isOutputSizeKnown() {
+            return StreamOpFlags.SIZED.isKnown(flags);
+        }
+
         boolean isShortCircuit() {
             return StreamOpFlags.SHORT_CIRCUIT.isKnown(flags);
         }
 
-        boolean isInfintelySized() {
-            return StreamOpFlags.INFINITELY_SIZED.isKnown(flags);
+        @Override
+        public boolean hasZeroDepth() {
+            return to == from;
         }
 
+        @Override
         public int getFlags() {
             return flags;
         }
 
         @Override
+        public int getOutputSizeIfKnown() {
+            return isOutputSizeKnown() ? source.getSizeIfKnown() : -1;
+        }
+
+        @Override
+        public boolean isOutputInfinitelySized() {
+            return StreamOpFlags.INFINITELY_SIZED.isKnown(flags);
+        }
+
+        @Override
         public StreamAccessor<P_IN> getStreamAccessor() {
             return source;
         }
@@ -210,12 +226,19 @@
             // @@@ There is currently no mechanism to differentiate between SortedOp and CumulateOp, they are both
             //     stateful but the latter does not need to consume all the input
 
-            if (!terminal.isShortCircuit() && isInfintelySized()) {
+            if (!terminal.isShortCircuit() && isOutputInfinitelySized()) {
                 throw new IllegalStateException("A stream of known infinite size is input to a non-short-circuiting terminal operation");
             }
         }
 
         @Override
+        public Node<E_OUT> collect(boolean flattenTree) {
+            final NodeBuilder<E_OUT> nb = Nodes.makeBuilder(getOutputSizeIfKnown());
+            wrapInto(nb);
+            return nb;
+        }
+
+        @Override
         public void wrapInto(Sink<E_OUT> sink) {
             Objects.requireNonNull(sink);
 
@@ -238,18 +261,36 @@
     }
 
     class ParallelImplPipelineHelper<P_IN> extends AbstractPipelineHelper<P_IN, E_OUT> implements ParallelPipelineHelper<P_IN, E_OUT> {
+        final long targetSize;
+
         ParallelImplPipelineHelper(StreamAccessor<P_IN> source, IntermediateOp[] ops, int from, int to) {
             super(source, ops, from, to);
+            this.targetSize = suggestTargetSize();
+        }
+
+        long suggestTargetSize() {
+            int estimate = source.estimateSize();
+            return estimate >= 0
+                   ? ForkJoinUtils.suggestTargetSize(estimate)
+                   : 2; // @@@ SWAG
         }
 
         //
 
         @Override
-        public long suggestTargetSize() {
-            int estimate = source.estimateSize();
-            return estimate >= 0
-                   ? ForkJoinUtils.suggestTargetSize(estimate)
-                   : 2; // @@@ SWAG
+        public Node<E_OUT> collect(boolean flattenTree) {
+            return TreeUtils.collect(this, flattenTree);
+        }
+
+        @Override
+        public boolean suggestNotSplit(Spliterator<P_IN> spliterator) {
+            int remaining = spliterator.estimateSize();
+            return (remaining <= suggestTargetSize() && remaining >= 0) || (spliterator.getNaturalSplits() == 0);
+        }
+
+        @Override
+        public NodeBuilder<E_OUT> makeBuilderFor(Spliterator<P_IN> s) {
+            return Nodes.makeBuilder(isOutputSizeKnown() ? s.estimateSize() : -1);
         }
 
         @Override
--- a/src/share/classes/java/util/streams/ParallelPipelineHelper.java	Fri Oct 19 17:27:49 2012 -0400
+++ b/src/share/classes/java/util/streams/ParallelPipelineHelper.java	Sat Oct 20 13:44:10 2012 -0400
@@ -26,6 +26,7 @@
 
 import java.util.Iterator;
 import java.util.concurrent.ForkJoinTask;
+import java.util.streams.ops.NodeBuilder;
 
 /**
  *
@@ -34,7 +35,24 @@
  */
 public interface ParallelPipelineHelper<P_IN, P_OUT> extends PipelineHelper<P_IN, P_OUT> {
 
-    long suggestTargetSize();
+    /**
+     * Ascertain if the spliterator should be split or not.
+     *
+     * @param s the spliterator to be split
+     * @return return true if the spliterator should not be further split, otherwise false.
+     */
+    boolean suggestNotSplit(Spliterator<P_IN> s);
+
+    /**
+     * Make a node builder from which to push output elements, where the input elements are obtained from
+     * a spliterator.
+     *
+     * <p>The builder will be optimized based on known size information of the spliterator and pipeline.</p>
+     *
+     * @param s the spliterator containing input elements.
+     * @return a node builder.
+     */
+    public NodeBuilder<P_OUT> makeBuilderFor(Spliterator<P_IN> s);
 
     <R> R evaluateSequential(Spliterator<P_IN> sp, TerminalSink<P_OUT, R> sink) default {
         wrapInto(sp, sink);
--- a/src/share/classes/java/util/streams/PipelineHelper.java	Fri Oct 19 17:27:49 2012 -0400
+++ b/src/share/classes/java/util/streams/PipelineHelper.java	Sat Oct 20 13:44:10 2012 -0400
@@ -25,6 +25,8 @@
 package java.util.streams;
 
 import java.util.Iterator;
+import java.util.streams.ops.Node;
+import java.util.streams.ops.NodeBuilder;
 
 /**
  *
@@ -34,6 +36,12 @@
 public interface PipelineHelper<P_IN, P_OUT> {
 
     /**
+     *
+     * @return true if the pipeline has no intermediate operations, otherwise false.
+     */
+    boolean hasZeroDepth();
+
+    /**
      * @return the combined stream and operation flags.
      *
      * @see {@link StreamOpFlags}
@@ -41,6 +49,31 @@
     int getFlags();
 
     /**
+     * Get the size of the stream output from the pipeline if known.
+     *
+     * @return the size of the output stream, otherwise {@code -1} if the size is unknown.
+     */
+    int getOutputSizeIfKnown();
+
+    /**
+     *
+     * @return true if the output stream is known to be infintely sized, otherwise false.
+     */
+    boolean isOutputInfinitelySized();
+
+    Node<P_OUT> collect() default {
+        return collect(false);
+    }
+
+    /**
+     * Collect the output elements as leaf nodes of a node tree.
+     *
+     * @param flattenTree if the node tree should be flattened to one node.
+     * @return the root node.
+     */
+    Node<P_OUT> collect(boolean flattenTree);
+
+    /**
      * Evaluate the pipeline and return the result of evaluation.
      *
      * @param sink the terminal sink, the last in the sink chain that produces a result.
@@ -62,7 +95,7 @@
     void wrapInto(Sink<P_OUT> sink) default {
         Sink<P_IN> wrappedSink = wrapSink(sink);
         StreamAccessor<P_IN> source = getStreamAccessor();
-        wrappedSink.begin(source.estimateSize());
+        wrappedSink.begin(source.getSizeIfKnown());
         source.forEach(wrappedSink);
         wrappedSink.end();
     }
--- a/src/share/classes/java/util/streams/Streams.java	Fri Oct 19 17:27:49 2012 -0400
+++ b/src/share/classes/java/util/streams/Streams.java	Sat Oct 20 13:44:10 2012 -0400
@@ -395,7 +395,7 @@
         }
     }
 
-    static class SpliteratorStreamAccessor<T> implements StreamAccessor<T> {
+    private static class SpliteratorStreamAccessor<T> implements StreamAccessor<T> {
         private final Spliterator<T> spliterator;
         private final int sizeOrUnknown;
         private final int flags;
--- a/src/share/classes/java/util/streams/ValuePipeline.java	Fri Oct 19 17:27:49 2012 -0400
+++ b/src/share/classes/java/util/streams/ValuePipeline.java	Sat Oct 20 13:44:10 2012 -0400
@@ -107,7 +107,7 @@
 
     @Override
     public Stream<U> concat(Stream<? extends U> other) {
-        return chainValue(ConcatOp.<U>make(other));
+        return chainValue(new ConcatOp<>(other));
     }
 
     @Override
@@ -164,7 +164,7 @@
             return this;
         }
         else {
-            Node<U> collected = evaluate(TreeUtils.CollectorOp.<U>singleton());
+            Node<U> collected = evaluate(CollectorOp.<U>singleton());
             return Streams.stream(collected, collected.size());
         }
     }
--- a/src/share/classes/java/util/streams/ops/AbstractTask.java	Fri Oct 19 17:27:49 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/AbstractTask.java	Sat Oct 20 13:44:10 2012 -0400
@@ -42,7 +42,6 @@
         extends CountedCompleter<R> {
     protected final ParallelPipelineHelper<P_IN, P_OUT> helper;
     protected final Spliterator<P_IN> spliterator;
-    protected final long targetSize;
 
     protected int numChildren;
     protected T children;
@@ -54,14 +53,12 @@
         super(null);
         this.helper = helper;
         this.spliterator = helper.spliterator();
-        this.targetSize = helper.suggestTargetSize();
     }
 
     protected AbstractTask(T parent, Spliterator<P_IN> spliterator) {
         super(parent);
         this.helper = parent.helper;
         this.spliterator = spliterator;
-        this.targetSize = parent.targetSize;
     }
 
     protected abstract T makeChild(Spliterator<P_IN> spliterator);
@@ -99,14 +96,13 @@
 
     @Override
     public void compute() {
-        int remaining = spliterator.estimateSize();
-        int naturalSplits = spliterator.getNaturalSplits();
-        isLeaf = ((remaining <= targetSize) && (remaining >= 0)) || (naturalSplits == 0);
+        isLeaf = helper.suggestNotSplit(spliterator);
         if (isLeaf) {
             setRawResult(doLeaf());
             tryComplete();
         }
         else {
+            int naturalSplits = spliterator.getNaturalSplits();
             T curChild = null;
             setPendingCount(naturalSplits);
             numChildren = naturalSplits + 1;
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/share/classes/java/util/streams/ops/CollectorOp.java	Sat Oct 20 13:44:10 2012 -0400
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package java.util.streams.ops;
+
+import java.util.streams.ParallelPipelineHelper;
+import java.util.streams.PipelineHelper;
+
+public class CollectorOp<E_IN> implements TerminalOp<E_IN, Node<E_IN>> {
+
+    private static CollectorOp<?> INSTANCE = new CollectorOp<>();
+
+    public static <E_IN> CollectorOp<E_IN> singleton() {
+        return (CollectorOp<E_IN>) INSTANCE;
+    }
+
+    @Override
+    public <P_IN> Node<E_IN> evaluateSequential(PipelineHelper<P_IN, E_IN> helper) {
+        return helper.collect();
+    }
+
+    @Override
+    public <P_IN> Node<E_IN> evaluateParallel(ParallelPipelineHelper<P_IN, E_IN> helper) {
+        return helper.collect();
+    }
+}
--- a/src/share/classes/java/util/streams/ops/ConcatOp.java	Fri Oct 19 17:27:49 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/ConcatOp.java	Sat Oct 20 13:44:10 2012 -0400
@@ -29,12 +29,18 @@
 import java.util.Objects;
 import java.util.streams.*;
 
-public abstract class ConcatOp<T> implements StatefulOp<T, T> {
+public class ConcatOp<T> implements StatefulOp<T, T> {
 
-    private final StreamShape shape;
+    private final Stream<? extends T> stream;
 
-    private ConcatOp(StreamShape shape) {
-        this.shape = shape;
+    // @@@ Support Streamable
+    //     This will ensure the operation can be used in detached pipelines
+    //     and there is a choice to obtain the serial or parallel stream
+    //     There are still cases where Stream is useful e.g. from I/O sources
+    // @@@ Requires flags are available on Streamable for analysis of the pipeline
+    // @@@ Might be possible for some consolidation if Streamable was generic to Stream/MapStream
+    public ConcatOp(Stream<? extends T> stream) {
+        this.stream = stream;
     }
 
     @Override
@@ -43,102 +49,46 @@
     }
 
     @Override
-    public abstract Iterator<T> wrapIterator(Iterator<T> source);
+    public Iterator<T> wrapIterator(Iterator<T> source) {
+        Objects.requireNonNull(source);
 
-    @Override
-    public abstract Sink<T> wrapSink(Sink sink);
-
-    @Override
-    public StreamShape inputShape() {
-        return shape;
+        return Iterators.concat(source, stream.iterator());
     }
 
     @Override
-    public StreamShape outputShape() {
-        return shape;
+    public Sink<T> wrapSink(Sink sink) {
+        Objects.requireNonNull(sink);
+
+        return new Sink.ChainedValue<T>(sink) {
+            @Override
+            public void accept(T t) {
+                downstream.accept(t);
+            }
+
+            @Override
+            public void end() {
+                // Pull from the concatenating stream to ensure sequential access
+                // Note that stream.forEach(downstream) will not, in the parallel case,
+                // guarantee an order, and stream.sequential().forEach(downstream) will
+                // result in buffering of the stream contents
+                Iterator<? extends T> i = stream.iterator();
+                while (i.hasNext()) {
+                    downstream.accept(i.next());
+                }
+                downstream.end();
+            }
+        };
     }
 
     @Override
     public <S> Node<T> evaluateParallel(ParallelPipelineHelper<S, T> helper) {
         // Get all stuff from upstream
-        Node<T> upStreamNode = TreeUtils.collect(helper, false);
+        Node<T> upStreamNode = helper.collect();
 
         // Get stuff from concatenation
-        Node<T> concatStreamNode = computeParallelFromConcatenatingStream();
+        Node<T> concatStreamNode = Nodes.node(stream);
 
         // Combine
         return Nodes.node(upStreamNode, concatStreamNode);
     }
-
-    protected abstract Node<T> computeParallelFromConcatenatingStream();
-
-    // @@@ Support Streamable
-    //     This will ensure the operation can be used in detached pipelines
-    //     and there is a choice to obtain the serial or parallel stream
-    //     There are still cases where Stream is useful e.g. from I/O sources
-    // @@@ Requires flags are available on Streamable for analysis of the pipeline
-    // @@@ Might be possible for some consolidation if Streamable was generic to Stream/MapStream
-    public static <T> ConcatOp<T> make(final Stream<? extends T> stream) {
-        return new ConcatOp<T>(StreamShape.VALUE) {
-
-            @Override
-            public Iterator<T> wrapIterator(Iterator<T> source) {
-                Objects.requireNonNull(source);
-
-                return Iterators.concat(source, stream.iterator());
-            }
-
-            @Override
-            public Sink<T> wrapSink(Sink sink) {
-                Objects.requireNonNull(sink);
-
-                return new Sink.ChainedValue<T>(sink) {
-                    @Override
-                    public void accept(T t) {
-                        downstream.accept(t);
-                    }
-
-                    @Override
-                    public void end() {
-                        // Pull from the concatenating stream to ensure sequential access
-                        // Note that stream.forEach(downstream) will not, in the parallel case,
-                        // guarantee an order, and stream.sequential().forEach(downstream) will
-                        // result in buffering of the stream contents
-                        Iterator<? extends T> i = stream.iterator();
-                        while (i.hasNext()) {
-                            downstream.accept(i.next());
-                        }
-                        downstream.end();
-                    }
-                };
-            }
-
-            @Override
-            protected Node<T> computeParallelFromConcatenatingStream() {
-                // Get stuff from concat stream
-                if (stream.isParallel() && stream instanceof AbstractPipeline) {
-                    // @@@ Yuk! the cast sucks, but it avoids uncessary wrapping
-                    // @@@ Also dangerous as CollectorOp can only be used when the stream is parallel
-                    return ((AbstractPipeline<T, T>) stream).pipeline(TreeUtils.CollectorOp.<T>singleton());
-                }
-                else {
-                    // @@@ Yuk! too much copying
-                    final NodeBuilder<T> nb = Nodes.makeBuilder();
-
-                    // @@@ stream.into(sb) fails because the NodeBuilder does not implement the full contract
-                    // of Collection
-
-                    nb.begin(-1);
-                    Iterator<? extends T> i = stream.iterator();
-                    while (i.hasNext())
-                        nb.accept(i.next());
-                    nb.end();
-
-                    return nb;
-                }
-            }
-
-        };
-    }
-
 }
--- a/src/share/classes/java/util/streams/ops/CumulateOp.java	Fri Oct 19 17:27:49 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/CumulateOp.java	Sat Oct 20 13:44:10 2012 -0400
@@ -117,16 +117,30 @@
     }
 
     private static class Problem<S, T> {
-        final long targetSize;
         final BinaryOperator<T> op;
         final ParallelPipelineHelper<S, T> helper;
         int pass = 0;
 
         private Problem(BinaryOperator<T> op, ParallelPipelineHelper<S, T> helper) {
-            this.targetSize = helper.suggestTargetSize();
             this.op = op;
             this.helper = helper;
         }
+
+        T operate(T a, T b) {
+            // If there is a operation before cumulate that removes elements, such as filter
+            // then elements may never get pushed into the sink. In such a case the terminal
+            // sink will return null on a call to "getAndClearState".
+            //
+            // This is more likely to occur if the leaf spliterator size is small
+
+            if (a == null && b == null)
+                return null;
+            if (a == null)
+                return b;
+            if (b == null)
+                return a;
+            return op.operate(a, b);
+        }
     }
 
     private class CumulateTask<S> extends RecursiveTask<Node<T>> {
@@ -159,16 +173,14 @@
         protected Node<T> compute() {
             switch (problem.pass) {
                 case 0:
-                    int remaining = spliterator.estimateSize();
-                    int naturalSplits = spliterator.getNaturalSplits();
-                    isLeaf = ((remaining <= problem.targetSize) && (remaining >= 0)) || (naturalSplits == 0);
+                    isLeaf = problem.helper.suggestNotSplit(spliterator);
                     if (!isLeaf) {
                         left = new CumulateTask<>(spliterator.split(), problem);
                         right = new CumulateTask<>(spliterator, problem);
                         right.fork();
                         left.compute();
                         right.join();
-                        upward = problem.op.operate(left.upward, right.upward);
+                        upward = problem.operate(left.upward, right.upward);
                         if (isRoot) {
                             downwardZero = true;
                             problem.pass = 1;
@@ -176,10 +188,8 @@
                         }
                     }
                     else {
-                        leafData = Nodes.makeBuilder();
-                        TerminalSink<T, T> terminalSink = wrapSink(leafData);
-                        problem.helper.wrapInto(spliterator, terminalSink);
-                        upward = terminalSink.getAndClearState();
+                        leafData = problem.helper.makeBuilderFor(spliterator);
+                        upward = problem.helper.evaluateSequential(spliterator, wrapSink(leafData));
                         // Special case -- if problem.depth == 0, just wrap the result and be done
                         if (isRoot)
                             return leafData;
@@ -196,7 +206,7 @@
                         }
                         else {
                             left.downward = downward;
-                            right.downward = problem.op.operate(downward, left.upward);
+                            right.downward = problem.operate(downward, left.upward);
                         }
                         right.fork();
                         Node<T> leftResult = left.compute();
@@ -204,16 +214,17 @@
                         return Nodes.node(leftResult, rightResult);
                     }
                     else {
-                        // @@@ Pretty inefficient; should update in place when we have a better NodeBuilder representation
-                        result = Nodes.makeBuilder(leafData.size());
-                        if (downwardZero) {
-                            leafData.forEach(result);
+                        if (!downwardZero && downward != null) {
+                            if (leafData instanceof NodeBuilder.Fixed) {
+                                T[] content = ((NodeBuilder.Fixed<T>)leafData).getContent();
+                                for (int i = 0; i < leafData.size(); i++) {
+                                    content[i] = problem.op.operate(downward, content[i]);
+                                }
+                            } else {
+                                leafData.forEachUpdate(e -> problem.op.operate(downward, e));
+                            }
                         }
-                        else {
-                            for (T t : leafData)
-                                result.accept(problem.op.operate(downward, t));
-                        }
-                        return result;
+                        return leafData;
                     }
 
                 default:
--- a/src/share/classes/java/util/streams/ops/GroupByOp.java	Fri Oct 19 17:27:49 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/GroupByOp.java	Sat Oct 20 13:44:10 2012 -0400
@@ -24,11 +24,9 @@
  */
 package java.util.streams.ops;
 
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.functions.Factory;
 import java.util.functions.Mapper;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -44,17 +42,27 @@
  * @param <K> Type of elements in the resulting Map.
  * @author Brian Goetz
  */
+// @@@ Change from ArrayList to using a SpinedList, to be implemented, which avoids copying
+//     of data when list needs to be re-sized
 public class GroupByOp<T, K> implements TerminalOp<T, Map<K, Collection<T>>> {
+
     private final Mapper<? super T, ? extends K> mapper;
 
+    private final Factory<Collection<T>> valueFactory;
+
     public GroupByOp(Mapper<? super T, ? extends K> mapper) {
+        this(mapper, ArrayList::new);
+    }
+
+    public GroupByOp(Mapper<? super T, ? extends K> mapper, Factory<Collection<T>> valueFactory) {
         this.mapper = mapper;
+        this.valueFactory = valueFactory;
     }
 
     // Public for tests
     public TerminalSink<T, Map<K, Collection<T>>> sink() {
         return new TerminalSink<T, Map<K, Collection<T>>>() {
-            private Map<K, NodeBuilder<T>> map;
+            private Map<K, Collection<T>> map;
 
             @Override
             public void begin(int size) {
@@ -63,8 +71,7 @@
 
             @Override
             public Map<K, Collection<T>> getAndClearState() {
-                // @@@ Fragile cast, need a better way to switch NodeBuilder into Collection
-                Map<K, Collection<T>> result = (Map) map;
+                Map<K, Collection<T>> result = map;
                 map = null;
                 return result;
             }
@@ -72,12 +79,12 @@
             @Override
             public void accept(T t) {
                 K key = Objects.requireNonNull(mapper.map(t), String.format("The element %s cannot be mapped to a null key", t));
-                NodeBuilder<T> sb = map.get(key);
-                if (sb == null) {
-                    sb = Nodes.makeBuilder();
-                    map.put(key, sb);
+                Collection<T> c = map.get(key);
+                if (c == null) {
+                    c = valueFactory.make();
+                    map.put(key, c);
                 }
-                sb.accept(t);
+                c.add(t);
             }
         };
     }
@@ -95,23 +102,22 @@
             Logger.getLogger(getClass().getName()).log(Level.WARNING, "GroupByOp.evaluateParallel does not preserve encounter order");
         }
 
-        final ConcurrentHashMap<K, NodeBuilder<T>> map = new ConcurrentHashMap<>();
+        final ConcurrentHashMap<K, Collection<T>> map = new ConcurrentHashMap<>();
 
         // Cache the sink chain, so it can be reused by all F/J leaf tasks
         Sink<S> sinkChain = helper.wrapSink(new Sink.OfValue<T>() {
             @Override
             public void accept(T t) {
                 K key = Objects.requireNonNull(mapper.map(t), String.format("The element %s cannot be mapped to a null key", t));
-                final NodeBuilder<T> sb = map.computeIfAbsent(key, k -> Nodes.makeBuilder());
+                final Collection<T> sb = map.computeIfAbsent(key, (k) -> valueFactory.make());
                 synchronized (sb) {
-                    sb.accept(t);
+                    sb.add(t);
                 }
             }
         });
 
         OpUtils.forEach(helper, sinkChain);
 
-        // @@@ Fragile cast, need a better way to switch NodeBuilder into Collection
-        return (Map) map;
+        return map;
     }
 }
--- a/src/share/classes/java/util/streams/ops/LimitOp.java	Fri Oct 19 17:27:49 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/LimitOp.java	Sat Oct 20 13:44:10 2012 -0400
@@ -66,9 +66,13 @@
         // Dumb serial implementation defering to iterator
         final Iterator<T> i = wrapIterator(helper.iterator());
 
-        // @@@ if limit is small enough can use fixed size builder
-        final NodeBuilder<T> nb = Nodes.makeBuilder();
-        nb.begin(limit); // @@@ what if source is smaller than limit?
+        final int size = helper.isOutputInfinitelySized()
+                         ? limit
+                         : Math.min(helper.getOutputSizeIfKnown(), limit);
+
+        final NodeBuilder<T> nb = Nodes.makeBuilder(size);
+
+        nb.begin(size);
         while (i.hasNext())
             nb.accept(i.next());
         nb.end();
--- a/src/share/classes/java/util/streams/ops/Node.java	Fri Oct 19 17:27:49 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/Node.java	Sat Oct 20 13:44:10 2012 -0400
@@ -24,18 +24,61 @@
  */
 package java.util.streams.ops;
 
-import java.util.Sized;
-import java.util.Traversable;
+import java.util.*;
 import java.util.streams.Spliterator;
 
 public interface Node<T> extends Traversable<T>, Sized {
 
-    void copyTo(T[] array, int offset);
-
+    /**
+     *
+     * @return the spliterator for this node.
+     */
     Spliterator<T> spliterator();
 
-    interface ConcNode<T> extends Node<T> {
+    /**
+     *
+     * @return the number of child nodes
+     */
+    int getChildCount() default {
+        return 0;
+    }
 
-        Node<T>[] nodes();
+    /**
+     *
+     * @return the child nodes.
+     */
+    Iterator<Node<T>> children() default {
+        return Collections.emptyIterator();
     }
+
+    /**
+     * Flatten this node to a node that has no children.
+     * <p>If this node has no children then this node may be returned.</p>
+     *
+     * @return a flattened node which has no children but holds the equivalent content as this node.
+     */
+    Node flatten();
+
+    /**
+     * View this node as an array.
+     * <p/>
+     * <p>Depending on the underlying implementation this may return a reference to an array rather than
+     * a copy. It is the callers reponsibility to decide if either this node or the array is
+     * utilized as the primary reference for the data.</p>
+     *
+     * @return the array.
+     */
+    T[] asArray();
+
+    /**
+     * Copy the content of this node into an array at an offset.
+     *
+     *
+     * @param array the array to copy the data into.
+     * @param offset the offset into the array.
+     * @throws IndexOutOfBoundsException if copying would cause access of data outside array bounds.
+     * @throws NullPointerException if <code>array</code> is <code>null</code>.
+     */
+    void copyInto(T[] array, int offset) throws IndexOutOfBoundsException;
+
 }
--- a/src/share/classes/java/util/streams/ops/NodeBuilder.java	Fri Oct 19 17:27:49 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/NodeBuilder.java	Sat Oct 20 13:44:10 2012 -0400
@@ -25,30 +25,40 @@
 package java.util.streams.ops;
 
 import java.util.Collection;
-import java.util.functions.Block;
+import java.util.functions.UnaryOperator;
 import java.util.streams.Sink;
 
 /**
- * NodeBuilder
+ * A builder of a node.
  *
  * @author Brian Goetz
  */
-// @@@ Support in place update for CumulateOp use-case
-public interface NodeBuilder<T> extends Node<T>, Collection<T>, Sink.OfValue<T> {
+// @@@ Implement Stream.Destination rather than Collection for bulk adding with addAll ? (see Nodes.node(Stream ) )
+public interface NodeBuilder<T> extends Node<T>, Sink.OfValue<T> {
 
-    @Override
-    void forEach(Block<? super T> block);
-
-    @Override
     void clear();
 
-    @Override
-    Object[] toArray();
+    /**
+     * Update all elements in place.
+     *
+     * @param operator the function to update elements
+     */
+    void forEachUpdate(UnaryOperator<T> operator);
+
+    // @@@ view: Node<T> asNode() rather than extending from Node ?
 
     /**
-     * View the node builder as a collection.
+     * A builder of a node that uses a fixed sized array for holding the node content.
      *
-     * @return the collection.
      */
-    Collection<T> asCollection();
+    interface Fixed<T> extends NodeBuilder<T> {
+
+        /**
+         * Get the array that holds the node content.
+         * The size of the node may be less than or equal to the size of the returned array.
+         *
+         * @return the array that holds the node content.
+         */
+        T[] getContent();
+    }
 }
--- a/src/share/classes/java/util/streams/ops/Nodes.java	Fri Oct 19 17:27:49 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/Nodes.java	Sat Oct 20 13:44:10 2012 -0400
@@ -26,24 +26,82 @@
 
 import java.util.*;
 import java.util.functions.Block;
+import java.util.functions.UnaryOperator;
 import java.util.logging.Logger;
-import java.util.streams.Spliterator;
-import java.util.streams.Stream;
-import java.util.streams.Streams;
+import java.util.streams.*;
 
 public class Nodes {
+    static interface NodeStreamAccessor<T> extends StreamAccessor<T> {
+        Node<T> asNode();
+    }
+
+    public static <T> NodeStreamAccessor<T> toStreamAccessor(Node<T> node) {
+        return new NodeStreamAccessorImpl<>(node);
+    }
+
+    // @@@ This is a copy of Streams.SpliteratorStreamAccessor
+    private static class NodeStreamAccessorImpl<T> implements NodeStreamAccessor<T> {
+        private final Node<T> node;
+        private final Spliterator<T> spliterator;
+        private final int sizeOrUnknown;
+        private final int flags;
+
+        public NodeStreamAccessorImpl(Node<T> node) {
+            this.node = node;
+            this.spliterator = node.spliterator();
+            this.sizeOrUnknown = node.size();
+            this.flags = StreamOpFlags.IS_ORDERED;
+        }
+
+        @Override
+        public Node<T> asNode() {
+            return node;
+        }
+
+        // StreamAccessor
+
+        @Override
+        public void forEach(Block<T> sink) {
+            spliterator.forEach(sink);
+        }
+
+        @Override
+        public int getStreamFlags() {
+            return flags;
+        }
+
+        @Override
+        public int getSizeIfKnown() {
+            return sizeOrUnknown;
+        }
+
+        @Override
+        public Iterator<T> iterator() {
+            return spliterator.iterator();
+        }
+
+        @Override
+        public Spliterator<T> spliterator() {
+            return spliterator;
+        }
+
+        @Override
+        public boolean isParallel() {
+            return true;
+        }
+    }
 
     public static<T> Node<T> node(final T[] array) {
         return new ArrayNode<>(array);
     }
 
-    private static class ArrayNode<T> extends AbstractCollection<T> implements Node<T> {
+    private static class ArrayNode<T> implements Node<T> {
 
         final T[] array;
         int curSize;
 
         ArrayNode(int size) {
-            array = (T[])new Object[size];
+            this((T[]) new Object[size]);
         }
 
         ArrayNode(T[] array) {
@@ -51,7 +109,12 @@
             this.curSize = array.length;
         }
 
-        //
+        // Node
+
+        @Override
+        public Node flatten() {
+            return this;
+        }
 
         @Override
         public Spliterator<T> spliterator() {
@@ -59,11 +122,20 @@
         }
 
         @Override
-        public void copyTo(T[] dest, int destOffset) {
+        public void copyInto(T[] dest, int destOffset) {
             System.arraycopy(array, 0, dest, destOffset, curSize);
         }
 
-        //
+        @Override
+        public T[] asArray() {
+            if (array.length == curSize) {
+                return array;
+            } else {
+                return Arrays.copyOf(array, curSize);
+            }
+        }
+
+        // Traversable
 
         @Override
         public void forEach(Block<? super T> block) {
@@ -72,35 +144,21 @@
             }
         }
 
-        @Override
-        public Stream<T> stream() {
-            return Streams.stream(this, size());
-        }
+        // Iterable
 
         @Override
         public Iterator<T> iterator() {
             return Arrays.iterator(array, 0, curSize);
         }
 
-        @Override
-        public void clear() {
-            curSize = 0;
-        }
+        // Sized
 
         @Override
         public int size() {
             return curSize;
         }
 
-        @Override
-        public Object[] toArray() {
-            if (curSize == array.length) {
-                return array;
-            } else {
-                // @@@ Should this throw ISE instead?
-                return Arrays.copyOf(array, curSize);
-            }
-        }
+        //
 
         @Override
         public String toString() {
@@ -108,19 +166,192 @@
         }
     }
 
-    @SafeVarargs
-    public static<T> Node.ConcNode<T> node(Node<T>... nodes) {
-        return new ConcNodeImpl<>(nodes);
+    public static<T> Node<T> node(Collection<T> c) {
+        return new CollectionNode<>(c);
     }
 
-    private static class ConcNodeImpl<T> implements Node.ConcNode<T> {
+    private static class CollectionNode<T> implements Node<T> {
+
+        final Collection<T> c;
+
+        CollectionNode(Collection<T> c) {
+            this.c = c;
+        }
+
+        // Node
+
+        @Override
+        public Node flatten() {
+            return this;
+        }
+
+        @Override
+        public Spliterator<T> spliterator() {
+            // @@@ Not very efficient
+            //     This requires a general way to obtain a spliterator from a collection
+            return Arrays.spliterator((T[]) c.toArray());
+        }
+
+        @Override
+        public void copyInto(T[] array, int offset) {
+            // @@@ Not very efficient
+            Objects.requireNonNull(array);
+            for (T t : this)
+                array[offset++] = t;
+        }
+
+        @Override
+        public T[] asArray() {
+            T[] array = (T[]) new Object[c.size()];
+            copyInto(array, 0);
+            return array;
+        }
+
+        // Traversable
+
+        @Override
+        public void forEach(Block<? super T> block) {
+            c.forEach(block);
+        }
+
+        // Iterable
+
+        @Override
+        public Iterator<T> iterator() {
+            return c.iterator();
+        }
+
+        // Sized
+
+        @Override
+        public int size() {
+            return c.size();
+        }
+
+        //
+
+        @Override
+        public String toString() {
+            return String.format("CollectionNode[%d][%s]", c.size(), c);
+        }
+    }
+
+    public static<T> Node<T> node(Streamable<Stream<T>> s) {
+        return node(s.parallel());
+    }
+
+    public static<T> Node<T> node(Stream<? extends T> stream) {
+        if (stream instanceof AbstractPipeline) {
+            // @@@ Yuk! the cast sucks, but it avoids uncessary wrapping
+            return ((AbstractPipeline<T, T>) stream).pipeline(CollectorOp.<T>singleton());
+        }
+        else {
+            // @@@ This path can occur if there are other implementations of Stream
+            //     e.g. if the a Stream instance is a proxy
+            final NodeBuilder<T> nb = Nodes.makeBuilder();
+
+            // @@@ stream.into(sb) fails because the NodeBuilder does not implement the full contract
+            // of Collection
+            // @@@ NodeBuilder could implement Stream.Destination
+
+            nb.begin(-1);
+            Iterator<? extends T> i = stream.iterator();
+            while (i.hasNext())
+                nb.accept(i.next());
+            nb.end();
+
+            return nb;
+        }
+    }
+
+    @SafeVarargs
+    public static<T> Node<T> node(Node<T>... nodes) {
+        Objects.requireNonNull(nodes);
+        if (nodes.length < 2) {
+            // @@@ The signature could be (Node<T> n1, Node<T> n2, Node<T>... rest)
+            //     but requires more work to create the final array
+            throw new IllegalArgumentException("The number of nodes must be > 1");
+        }
+        return new ConcNode<>(nodes);
+    }
+
+    private static class ConcNode<T> implements Node<T> {
         final Node<T>[] nodes;
         int size = 0;
 
-        private ConcNodeImpl(Node<T>[] nodes) {
+        private ConcNode(Node<T>[] nodes) {
             this.nodes = nodes;
         }
 
+        // Node
+
+        @Override
+        public Spliterator<T> spliterator() {
+            return new ConcNodeSpliterator<>(this);
+        }
+
+        @Override
+        public int getChildCount() {
+            return nodes.length;
+        }
+
+        @Override
+        public Iterator<Node<T>> children() {
+            // @@@ This is more effiecient than Arrays.iterator(nodes)
+            //     which should be updated to not create an iteratable then an iterator
+            return new Iterator<Node<T>>() {
+                int i = 0;
+
+                @Override
+                public boolean hasNext() {
+                    return i < nodes.length;
+                }
+
+                @Override
+                public Node<T> next() {
+                    try {
+                        return nodes[i++];
+                    } catch (IndexOutOfBoundsException e) {
+                        throw new NoSuchElementException();
+                    }
+                }
+            };
+        }
+
+        @Override
+        public Node<T> flatten() {
+            return TreeUtils.flatten(this);
+        }
+
+        @Override
+        public void copyInto(T[] array, int offset) {
+            Objects.requireNonNull(array);
+            TreeUtils.copyTo(this, array, offset);
+        }
+
+        @Override
+        public T[] asArray() {
+            return flatten().asArray();
+        }
+
+        // Traversable
+
+        @Override
+        public void forEach(Block<? super T> block) {
+            for (Node<T> n : nodes)
+                n.forEach(block);
+        }
+
+        // Iterable
+
+        @Override
+        public Iterator<T> iterator() {
+            // @@@ Should do a depth first search and accummulate the leaf nodes then concat the iterators
+            return Iterators.concat(Arrays.stream(nodes).map(n -> n.iterator()).iterator());
+        }
+
+        // Sized
+
         @Override
         public int size() {
             if (size == 0) {
@@ -130,50 +361,23 @@
             return size;
         }
 
-        @Override
-        public void forEach(Block<? super T> block) {
-            for (Node<T> n : nodes)
-                n.forEach(block);
-        }
-
-        @Override
-        public Iterator<T> iterator() {
-            return Iterators.concat(Arrays.stream(nodes).map(n -> n.iterator()).iterator());
-        }
-
-        @Override
-        public Spliterator<T> spliterator() {
-            return new ConcNodeSpliterator<>(this);
-        }
-
-        @Override
-        public void copyTo(T[] array, int offset) {
-            int s = 0;
-            for (Node<T> n : nodes) {
-                n.copyTo(array, offset+s);
-                s += n.size();
-            }
-        }
+        //
 
         @Override
         public String toString() {
             return String.format("ConcNode[%s]", Arrays.stream(nodes).map(n -> n.toString()).into(new StringJoiner(",")).toString());
         }
 
-        @Override
-        public Node<T>[] nodes() {
-            return nodes;
-        }
-
-        private static class ConcNodeSpliterator<T> implements Spliterator<T>, Iterator<T> {
+        private static class ConcNodeSpliterator<T> implements Spliterator<T> {
             private Node<T> cur;
+            private Iterator<Node<T>> children;
             private int splitsLeft;
-            private int nextSplitIndex;
             private Iterator<T> iterator;
 
-            private ConcNodeSpliterator(ConcNodeImpl<T> cur) {
+            private ConcNodeSpliterator(ConcNode<T> cur) {
                 this.cur = cur;
-                splitsLeft = cur.nodes.length - 1;
+                this.children = cur.children();
+                this.splitsLeft = cur.getChildCount() - 1;
             }
 
             public Iterator<T> iterator() {
@@ -191,21 +395,19 @@
             public Spliterator<T> split() {
                 if (iterator != null)
                     throw new IllegalStateException("split after iterate");
-                else if (splitsLeft == 0 || !(cur instanceof ConcNode))
+                else if (splitsLeft == 0)
                     return Streams.emptySpliterator();
                 else {
-                    Node<T>[] nodes = ((ConcNode<T>) cur).nodes();
-                    Spliterator<T> ret = nodes[nextSplitIndex++].spliterator();
+                    Spliterator<T> ret = children.next().spliterator();
                     if (--splitsLeft == 0) {
-                        cur = nodes[nextSplitIndex];
-                        if (cur instanceof ConcNode) {
-                            Node<T>[] newNodes  = ((ConcNode<T>) cur).nodes();
-                            splitsLeft = newNodes.length - 1;
-                            nextSplitIndex = 0;
+                        cur = children.next();
+                        if (cur.getChildCount() > 0) {
+                            children = cur.children();
+                            splitsLeft = cur.getChildCount() - 1;
                         }
                         else {
+                            children = Collections.emptyIterator();
                             splitsLeft = 0;
-                            nextSplitIndex = -1;
                         }
                     }
                     return ret;
@@ -233,16 +435,6 @@
             public boolean isPredictableSplits() {
                 return true;
             }
-
-            @Override
-            public boolean hasNext() {
-                return iterator().hasNext();
-            }
-
-            @Override
-            public T next() {
-                return iterator().next();
-            }
         }
     }
 
@@ -269,41 +461,40 @@
         return new SimpleSpinedNodeBuilder<>();
     }
 
-    public static <T> NodeBuilder<T> makeBuilderWithInitialSize(int initialSize) {
-        return new SimpleSpinedNodeBuilder<>(initialSize);
-    }
-
-    private static class FixedNodeBuilder<T> extends ArrayNode<T> implements NodeBuilder<T> {
+    private static class FixedNodeBuilder<T> extends ArrayNode<T> implements NodeBuilder.Fixed<T> {
 
         private FixedNodeBuilder(int size) {
             super(size);
         }
 
         @Override
-        public Collection<T> asCollection() {
-            return this;
+        public T[] getContent() {
+            return array;
         }
 
         //
 
         @Override
-        public Stream<T> stream() {
-            return Streams.stream(this, size());
+        public void clear() {
+            curSize = 0;
         }
 
         @Override
-        public Stream<T> parallel() {
-            return Streams.parallel(spliterator(), size());
+        public void forEachUpdate(UnaryOperator<T> f) {
+            for (int i = 0; i < curSize; i++) {
+                array[i] = f.operate(array[i]);
+            }
         }
 
         //
 
         @Override
         public void begin(int size) {
-            curSize = 0;
             if (size > array.length) {
                 Logger.getLogger(Nodes.class.getName()).warning("Estimate greater than length. There might be blood.");
             }
+
+            clear();
         }
 
         @Override
@@ -340,7 +531,7 @@
         }
     }
 
-    private static class SimpleSpinedNodeBuilder<T> extends AbstractCollection<T> implements NodeBuilder<T> {
+    private static class SimpleSpinedNodeBuilder<T> implements NodeBuilder<T> {
 
         private final static int CHUNKS_SIZE = 32;
         private final static int CHUNK_SIZE = 1024;
@@ -369,24 +560,74 @@
         }
 
         @Override
-        public Collection<T> asCollection() {
+        public void clear() {
+            size = 0;
+        }
+
+        @Override
+        public void forEachUpdate(UnaryOperator<T> f) {
+            int finalChunk = size / CHUNK_SIZE;
+            int finalIndex = size % CHUNK_SIZE;
+
+            for (int chunkIndex = 0; chunkIndex < finalChunk; chunkIndex++) {
+                T[] chunk = chunks[chunkIndex];
+                for (int i = 0; i < chunk.length; i++) {
+                    chunk[i] = f.operate(chunk[i]);
+                }
+            }
+
+            if (finalIndex > 0) {
+                // final chunk.
+                T[] partialChunk = chunks[finalChunk];
+                for (int index = 0; index < finalIndex; index++) {
+                    partialChunk[index] = f.operate(partialChunk[index]);
+                }
+            }
+        }
+
+        // Node
+
+        @Override
+        public Node flatten() {
             return this;
         }
 
         @Override
         public Spliterator<T> spliterator() {
             // @@@ Not very efficient
-            return Arrays.spliterator((T[]) toArray());
+            return Arrays.spliterator(asArray());
         }
 
         @Override
-        public void copyTo(T[] array, int offset) {
-            int i = offset;
+        public void copyInto(T[] array, int offset) {
+            Objects.requireNonNull(array);
+            // @@@ Not very efficient
             for (T t : this)
-                array[i++] = t;
+                array[offset++] = t;
         }
 
-        //
+        @Override
+        public T[] asArray() {
+            T[] result = (T[]) new Object[size];
+
+            int finalChunk = size / CHUNK_SIZE;
+            int finalIndex = size % CHUNK_SIZE;
+            int index = 0;
+            int chunk = 0;
+
+            // full chunks
+            while (chunk < finalChunk) {
+                System.arraycopy(chunks[chunk++], 0, result, index, CHUNK_SIZE);
+                index += CHUNK_SIZE;
+            }
+
+            // final bit.
+            System.arraycopy(chunks[chunk], 0, result, index, finalIndex);
+
+            return result;
+        }
+
+        // Sink
 
         @Override
         public void begin(int size) {
@@ -413,20 +654,19 @@
             // @@@ check begin(size) and size
         }
 
+        // Sized
+
         @Override
         public int size() {
             return size;
         }
 
         @Override
-        public Stream<T> stream() {
-            return Streams.stream(this, size());
+        public boolean isEmpty() {
+            return size != 0;
         }
 
-        @Override
-        public Stream<T> parallel() {
-            return Streams.parallel(spliterator(), size());
-        }
+        // Traversable
 
         @Override
         public void forEach(Block<? super T> block) {
@@ -448,6 +688,8 @@
             }
         }
 
+        // Iterable
+
         @Override
         public Iterator<T> iterator() {
             Iterator<T> result = new Iterator<T>() {
@@ -482,36 +724,7 @@
             return result;
         }
 
-        @Override
-        public void clear() {
-            size = 0;
-        }
-
-        @Override
-        public Object[] toArray() {
-            Object[] result = new Object[size];
-
-            int finalChunk = size / CHUNK_SIZE;
-            int finalIndex = size % CHUNK_SIZE;
-            int index = 0;
-            int chunk = 0;
-
-            // full chunks
-            while (chunk < finalChunk) {
-                System.arraycopy(chunks[chunk++], 0, result, index, CHUNK_SIZE);
-                index += CHUNK_SIZE;
-            }
-
-            // final bit.
-            System.arraycopy(chunks[chunk], 0, result, index, finalIndex);
-
-            return result;
-        }
-
-        @Override
-        public boolean isEmpty() {
-            return size != 0;
-        }
+        //
 
         @Override
         public int hashCode() {
@@ -527,7 +740,7 @@
 
         @Override
         public String toString() {
-            return String.format("SimpleSpinedNodeBuilder[%s]", Arrays.toString(toArray()));
+            return String.format("SimpleSpinedNodeBuilder[%s]", Arrays.toString(asArray()));
         }
     }
 
--- a/src/share/classes/java/util/streams/ops/StatefulOp.java	Fri Oct 19 17:27:49 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/StatefulOp.java	Sat Oct 20 13:44:10 2012 -0400
@@ -20,6 +20,7 @@
 
     @Override
     <P_IN> Node<E_OUT> evaluateSequential(PipelineHelper<P_IN, E_IN> helper) default {
+        // @@@ Can we determine the size from the pipeline and this operation?
         final NodeBuilder<E_OUT> nb = Nodes.makeBuilder();
         helper.wrapInto(wrapSink(nb));
         return nb;
--- a/src/share/classes/java/util/streams/ops/ToArrayOp.java	Fri Oct 19 17:27:49 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/ToArrayOp.java	Sat Oct 20 13:44:10 2012 -0400
@@ -27,6 +27,7 @@
 import java.util.Arrays;
 import java.util.streams.ParallelPipelineHelper;
 import java.util.streams.PipelineHelper;
+import java.util.streams.StreamAccessor;
 import java.util.streams.TerminalSink;
 
 /**
@@ -48,12 +49,12 @@
     public TerminalSink<T, Object[]> sink() {
         return new TerminalSink<T, Object[]>() {
             private static final int DEFAULT_CHUNK = 16;
-            Object[] elements;
+            T[] elements;
             int count = 0;
 
             @Override
             public void begin(int size) {
-                elements = new Object[size >= 0 ? size : DEFAULT_CHUNK];
+                elements = (T[]) new Object[size >= 0 ? size : DEFAULT_CHUNK];
                 count = 0;
             }
 
@@ -72,7 +73,7 @@
                 if (count == 0)
                     return EMPTY_ARRAY;
                 else {
-                    Object[] result = (count == elements.length)
+                    T[] result = (count == elements.length)
                                       ? elements
                                       : Arrays.copyOf(elements, count);
                     elements = null;
@@ -90,19 +91,19 @@
 
     @Override
     public <P_IN> Object[] evaluateParallel(ParallelPipelineHelper<P_IN, T> helper) {
-        // @@@ If the previous op is a stateful op then can optimize by getting direct access to the
-        //     node, if that node is flat then the array can be extracted, otherwise if the node is a tree
-        //     then that tree can be flattened, in parallel, using the ToArrayTask and then the array can be extracted
-        //     Require Spliterator.toArray with default method
+        // If there are no intermediate ops
+        if (helper.hasZeroDepth()) {
+            // If the stream accessor holds a node
+            // @@@ can abstract with StreamAccessor.asArray ?
+            StreamAccessor<P_IN> sa = helper.getStreamAccessor();
+            if (sa instanceof Nodes.NodeStreamAccessor) {
+                Node<P_IN> node = ((Nodes.NodeStreamAccessor)sa).asNode();
+                return node.flatten().asArray();
+            }
+        }
 
-        // Ensure tree is flattened
-        Node<T> node = TreeUtils.collect(helper, true);
-        @SuppressWarnings("unchecked")
-        T[] array = (T[]) new Object[node.size()];
-
-        // @@@ Should have a toArray or asArray method that avoids copying if the impl supports it
-        //     i.e. a phase change for node
-        node.copyTo(array, 0);
-        return array;
+        // Collect and ensure tree is flattened
+        Node<T> node = helper.collect(true);
+        return node.asArray();
     }
 }
--- a/src/share/classes/java/util/streams/ops/TreeUtils.java	Fri Oct 19 17:27:49 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/TreeUtils.java	Sat Oct 20 13:44:10 2012 -0400
@@ -25,11 +25,12 @@
 package java.util.streams.ops;
 
 import java.util.Arrays;
+import java.util.Iterator;
 import java.util.concurrent.CountedCompleter;
+import java.util.concurrent.ForkJoinUtils;
 import java.util.streams.ParallelPipelineHelper;
 import java.util.streams.PipelineHelper;
 import java.util.streams.Spliterator;
-import java.util.streams.StreamOpFlags;
 
 /**
  * Collector
@@ -44,56 +45,44 @@
     public static <P_IN, P_OUT> Node<P_OUT> collect(ParallelPipelineHelper<P_IN, P_OUT> helper,
                                                     boolean flattenTree) {
         Spliterator<P_IN> spliterator = helper.spliterator();
-        int size = spliterator.getSizeIfKnown();
-        boolean noSplit = ((size <= helper.suggestTargetSize()) && (size >= 0))
-                          || (spliterator.getNaturalSplits() == 0);
-        boolean splitSizesKnown = StreamOpFlags.SIZED.isKnown(helper.getFlags());
+        boolean noSplit = helper.suggestNotSplit(spliterator);
         if (noSplit) {
-            NodeBuilder<P_OUT> builder;
-            if (size >= 0 && splitSizesKnown) {
-                builder = Nodes.makeBuilder(size);
-                helper.wrapInto(spliterator, builder);
-                return builder;
-            } else {
-                builder = Nodes.makeBuilder();
-                helper.wrapInto(spliterator, builder);
-                return builder;
-            }
+            NodeBuilder<P_OUT> builder = helper.makeBuilderFor(spliterator);
+            helper.wrapInto(spliterator, builder);
+            return builder;
         } else {
-            if (size != -1 && spliterator.isPredictableSplits() && splitSizesKnown) {
+            int size = spliterator.getSizeIfKnown();
+            if (size >= 0 && helper.getOutputSizeIfKnown() == size && spliterator.isPredictableSplits()) {
                 P_OUT[] array = (P_OUT[]) new Object[size];
-                helper.invoke(new SizedCollectorTask<>(spliterator, helper, helper.suggestTargetSize(), array));
+                helper.invoke(new SizedCollectorTask<>(spliterator, helper, array));
                 return Nodes.node(array);
             } else {
                 CollectorTask<P_IN, P_OUT> task = new CollectorTask<>(helper);
                 helper.invoke(task);
                 Node<P_OUT> node = task.getRawResult();
-                if (flattenTree) {
-                    P_OUT[] array = (P_OUT[]) new Object[node.size()];
-                    helper.invoke(new ToArrayTask<>(node, array, 0));
-                    return Nodes.node(array);
-                }
-                return node;
+
+                // @@@ using default F/J pool, will that be different from that used by helper.invoke?
+                return flattenTree ? flatten(node) : node;
             }
         }
     }
 
-    public static class CollectorOp<E_IN> implements TerminalOp<E_IN, Node<E_IN>> {
+    public static <T> Node<T> flatten(Node<T> node) {
+        if (node.getChildCount() > 0) {
+            T[] array = (T[]) new Object[node.size()];
+            ForkJoinUtils.defaultFJPool().invoke(new ToArrayTask<>(node, array, 0));
+            return Nodes.node(array);
+        } else {
+            return node;
+        }
+    }
 
-        private static CollectorOp<?> INSTANCE = new CollectorOp<>();
-
-        public static <E_IN> CollectorOp<E_IN> singleton() {
-            return (CollectorOp<E_IN>) INSTANCE;
-        }
-
-        @Override
-        public <P_IN> Node<E_IN> evaluateSequential(PipelineHelper<P_IN, E_IN> helper) {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public <P_IN> Node<E_IN> evaluateParallel(ParallelPipelineHelper<P_IN, E_IN> helper) {
-            return TreeUtils.collect(helper, false);
+    public static <T> void copyTo(Node<T> node, T[] array, int offset) {
+        // @@@ Currently only used by Nodes.ConcNode
+        if (node.getChildCount() > 0) {
+            ForkJoinUtils.defaultFJPool().invoke(new ToArrayTask<>(node, array, offset));
+        } else {
+            node.copyInto(array, offset);
         }
     }
 
@@ -117,8 +106,7 @@
 
         @Override
         protected Node<U> doLeaf() {
-            boolean sizeKnown = StreamOpFlags.SIZED.isKnown(helper.getFlags());
-            NodeBuilder<U> builder = Nodes.makeBuilder(sizeKnown ? spliterator.getSizeIfKnown() : -1);
+            NodeBuilder<U> builder = helper.makeBuilderFor(spliterator);
             helper.wrapInto(spliterator, builder);
             return builder;
         }
@@ -139,15 +127,13 @@
     private static class SizedCollectorTask<T, U> extends CountedCompleter<Void> {
         private final Spliterator<T> spliterator;
         private final ParallelPipelineHelper<T, U> helper;
-        private final long targetSize;
         private final U[] array;
         private int offset;
         private int length;
 
-        private SizedCollectorTask(Spliterator<T> spliterator, ParallelPipelineHelper<T, U> helper, long targetSize, U[] array) {
+        private SizedCollectorTask(Spliterator<T> spliterator, ParallelPipelineHelper<T, U> helper, U[] array) {
             this.spliterator = spliterator;
             this.helper = helper;
-            this.targetSize = targetSize;
             this.array = array;
             this.offset = 0;
             this.length = array.length;
@@ -157,7 +143,6 @@
             super(parent);
             this.spliterator = spliterator;
             this.helper = parent.helper;
-            this.targetSize = parent.targetSize;
             this.array = parent.array;
             this.offset = offset;
             this.length = length;
@@ -171,14 +156,13 @@
 
         @Override
         public void compute() {
-            int remaining = spliterator.estimateSize();
-            int naturalSplits = spliterator.getNaturalSplits();
-            boolean isLeaf = ((remaining <= targetSize) && (remaining >= 0)) || (naturalSplits == 0);
+            boolean isLeaf = helper.suggestNotSplit(spliterator);
             if (isLeaf) {
                 helper.wrapInto(spliterator, Arrays.sink(array, offset, length));
                 tryComplete();
             }
             else {
+                int naturalSplits = spliterator.getNaturalSplits();
                 setPendingCount(naturalSplits);
                 int s = 0;
                 for (int i = 0; i < naturalSplits; i++) {
@@ -217,21 +201,23 @@
 
         @Override
         public void compute() {
-            if (node instanceof Node.ConcNode) {
-                Node<T>[] nodes = ((Node.ConcNode<T>) node).nodes();
-                setPendingCount(nodes.length - 1);
-                ToArrayTask<T> firstTask = new ToArrayTask<>(this, nodes[0], offset);
-                int size = nodes[0].size();
-                for (int i = 1; i < nodes.length; i++) {
-                    ToArrayTask<T> task = new ToArrayTask<>(this, nodes[i], offset + size);
+            if (node.getChildCount() > 0) {
+                setPendingCount(node.getChildCount() - 1);
+
+                final Iterator<Node<T>> itNodes = node.children();
+
+                final ToArrayTask<T> firstTask = new ToArrayTask<>(this, itNodes.next(), offset);
+                int size = firstTask.node.size();
+
+                while (itNodes.hasNext()) {
+                    final ToArrayTask<T> task = new ToArrayTask<>(this, itNodes.next(), offset + size);
+                    size += task.node.size();
+
                     task.fork();
-
-                    size += nodes[i].size();
                 }
                 firstTask.compute();
-            }
-            else {
-                node.copyTo(array, offset);
+            } else {
+                node.copyInto(array, offset);
                 tryComplete();
             }
         }
--- a/src/share/classes/java/util/streams/ops/UniqOp.java	Fri Oct 19 17:27:49 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/UniqOp.java	Sat Oct 20 13:44:10 2012 -0400
@@ -139,7 +139,6 @@
 
         OpUtils.forEach(helper, sinkChain);
 
-        // @@@ Not very efficient
-        return Nodes.node((T[])map.keySet().toArray());
+        return Nodes.node(map.keySet());
     }
 }
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/ConcatOpTest.java	Fri Oct 19 17:27:49 2012 -0400
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/ConcatOpTest.java	Sat Oct 20 13:44:10 2012 -0400
@@ -109,13 +109,13 @@
     @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
     public void testOpsSequential(String name, TestData<Integer> data) {
         testUsingData(data).
-                excerciseSingleOpFactory(() -> ConcatOp.make(data.seqStream()));
+                excerciseSingleOpFactory(() -> new ConcatOp<>(data.seqStream()));
     }
 
     @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
     public void testOpsParallel(String name, TestData<Integer> data) {
         testUsingData(data).
-                excerciseSingleOpFactory(() -> ConcatOp.make(data.parStream()));
+                excerciseSingleOpFactory(() -> new ConcatOp<>(data.parStream()));
     }
 }
 
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/CumulateOpTest.java	Fri Oct 19 17:27:49 2012 -0400
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/CumulateOpTest.java	Sat Oct 20 13:44:10 2012 -0400
@@ -27,7 +27,12 @@
 import org.openjdk.tests.java.util.streams.StreamTestDataProvider;
 import org.testng.annotations.Test;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 import java.util.streams.ops.CumulateOp;
+import java.util.streams.ops.FilterOp;
+import java.util.streams.ops.TeeOp;
 
 import static org.openjdk.tests.java.util.LambdaTestHelpers.*;
 
@@ -63,4 +68,10 @@
         result = exerciseOps(data, new CumulateOp<>(rMax));
         assertEquals(result.size(), data.size());
     }
+
+    @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
+    public void testWithFilter(String name, TestData<Integer> data) {
+        exerciseOps(data, new FilterOp<>(pEven), new CumulateOp<>(rPlus));
+        exerciseOps(data, new FilterOp<Integer>(x -> x == -1), new CumulateOp<>(rPlus));
+    }
 }
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/GroupByOpTest.java	Fri Oct 19 17:27:49 2012 -0400
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/GroupByOpTest.java	Sat Oct 20 13:44:10 2012 -0400
@@ -111,6 +111,18 @@
         assertEquals(mbResult.keySet().size(), Math.min(2, uniqueSize));
     }
 
+    @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
+    public void testWithValueFactory(String name, TestData<Integer> data) {
+        Map<Integer, Collection<Integer>> miResult = exerciseGroupByOps(data, new GroupByOp<Integer, Integer>(mId, LinkedList::new));
+
+        Set<Class<?>> classes = miResult.values().stream().map(e -> e.getClass()).into(new HashSet<Class<?>>());
+
+        if (data.size() > 0) {
+            assertEquals(classes.size(), 1);
+            assertEquals(LinkedList.class, classes.iterator().next());
+        }
+    }
+
     <T, K> Map<K, Collection<T>> exerciseGroupByOps(TestData<T> data, GroupByOp<T, K> gbop) {
         return exerciseOps(data, this::multiMapEquals, gbop);
     }
@@ -130,4 +142,5 @@
 
         return true;
     }
+
 }
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/NodeBuilderTest.java	Fri Oct 19 17:27:49 2012 -0400
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/NodeBuilderTest.java	Sat Oct 20 13:44:10 2012 -0400
@@ -29,43 +29,59 @@
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Iterator;
 import java.util.List;
+import java.util.functions.Mapper;
 import java.util.streams.ops.NodeBuilder;
 import java.util.streams.ops.Nodes;
 
 import static org.openjdk.tests.java.util.LambdaTestHelpers.assertContents;
 import static org.openjdk.tests.java.util.LambdaTestHelpers.countTo;
+import static org.testng.Assert.assertEquals;
 
 @Test
 public class NodeBuilderTest {
 
     @DataProvider(name = "sizes")
     public Object[][] createSizes() {
-        List<Integer> l = Arrays.asList(0, 1, 4, 16, 256,
-                                        1023, 1024, 1025,
-                                        2047, 2048, 2049,
-                                        1024 * 32 - 1, 1024 * 32, 1024 * 32 + 1);
+        List<Integer> sizes = Arrays.asList(0, 1, 4, 16, 256,
+                                            1023, 1024, 1025,
+                                            2047, 2048, 2049,
+                                            1024 * 32 - 1, 1024 * 32, 1024 * 32 + 1);
 
-        Object[][] params = new Object[l.size()][];
-        for (int i = 0; i < l.size(); i++) {
-            params[i] = new Object[]{new Integer(l.get(i))};
+        List<List<Integer>> ls = new ArrayList<>();
+        for (Integer size : sizes) {
+            ls.add(countTo(size));
+        }
+
+        List<Mapper<Integer, NodeBuilder<Integer>>> ms = Arrays.<Mapper<Integer, NodeBuilder<Integer>>>asList(
+                s -> Nodes.makeBuilder(),
+                s -> Nodes.makeBuilder(s)
+        );
+
+        Object[][] params = new Object[ls.size() * ms.size()][];
+        int i = 0;
+        for (List<Integer> l : ls) {
+            for (Mapper<Integer, NodeBuilder<Integer>> m : ms) {
+                params[i++] = new Object[]{l, m};
+            }
         }
 
         return params;
     }
 
     @Test(dataProvider = "sizes")
-    public void testIteration(int size) {
-        NodeBuilder<Integer> sb = Nodes.makeBuilder();
+    public void testIteration(List<Integer> l, Mapper<Integer, NodeBuilder<Integer>> m) {
+        NodeBuilder<Integer> nb = m.map(l.size());
+        for (int i : l) {
+            nb.accept(i);
+        }
 
-        List<Integer> l = countTo(size);
-        for (int i : l) {
-            sb.accept(i);
-        }
+        assertEquals(nb.size(), l.size());
 
         {
             List<Integer> _l = new ArrayList<>();
-            sb.forEach(e -> {
+            nb.forEach(e -> {
                 _l.add(e);
             });
 
@@ -74,11 +90,53 @@
 
         {
             List<Integer> _l = new ArrayList<>();
-            for (int i : sb) {
+            for (int i : nb) {
                 _l.add(i);
             }
 
             assertContents(_l, l);
         }
     }
+
+    @Test(dataProvider = "sizes")
+    public void testUpdate(List<Integer> l, Mapper<Integer, NodeBuilder<Integer>> m) {
+        NodeBuilder<Integer> nb = m.map(l.size());
+        for (int i : l) {
+            nb.accept(i);
+        }
+
+        // Negate each element
+        nb.forEachUpdate(e -> -e);
+
+        // Sum of original list and builder must be 0
+        int sum = 0;
+        Iterator<Integer> lit = l.iterator();
+        for (int i : nb) {
+            sum += i + lit.next();
+        }
+
+        assertEquals(sum, 0);
+    }
+
+    @Test(dataProvider = "sizes")
+    public void testClear(List<Integer> l, Mapper<Integer, NodeBuilder<Integer>> m) {
+        NodeBuilder<Integer> nb = m.map(l.size());
+        for (int i : l) {
+            nb.accept(i);
+        }
+
+        nb.clear();
+
+        for (int i : l) {
+            nb.accept(1);
+        }
+
+        int product = 1;
+        Iterator<Integer> lit = l.iterator();
+        for (int i : nb) {
+            product *= i;
+        }
+
+        assertEquals(product, 1);
+    }
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/NodeTest.java	Sat Oct 20 13:44:10 2012 -0400
@@ -0,0 +1,166 @@
+/*
+ * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package org.openjdk.tests.java.util.streams.ops;
+
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.functions.Mapper;
+import java.util.streams.Spliterator;
+import java.util.streams.Stream;
+import java.util.streams.Streamable;
+import java.util.streams.ops.Node;
+import java.util.streams.ops.NodeBuilder;
+import java.util.streams.ops.Nodes;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+
+@Test
+public class NodeTest {
+
+    @DataProvider(name = "nodes")
+    public Object[][] createSizes() {
+        Integer[] array = new Integer[100];
+        for (int i = 0; i < array.length; i++) {
+            array[i] = i;
+        }
+
+        List<Node<Integer>> nodes = new ArrayList<>();
+        nodes.add(Nodes.node(array));
+        nodes.add(Nodes.node(Arrays.asList(array)));
+        nodes.add(Nodes.node((Streamable<Stream<Integer>>) Arrays.asList(array)));
+        nodes.add(Nodes.node(Arrays.asList(array).stream()));
+        nodes.add(Nodes.node(Arrays.asList(array).parallel()));
+        nodes.add(degenerateTree(Arrays.asList(array).iterator()));
+        nodes.add(tree(Arrays.asList(array), l -> Nodes.node(l.toArray(new Integer[l.size()]))));
+        nodes.add(tree(Arrays.asList(array), l -> Nodes.node(l)));
+        nodes.add(fill(array, Nodes.<Integer>makeBuilder(array.length)));
+        nodes.add(fill(array, Nodes.<Integer>makeBuilder()));
+
+        Object[][] params = new Object[nodes.size()][];
+        for (int i = 0; i < nodes.size(); i++) {
+            params[i] = new Object[]{array, nodes.get(i)};
+        }
+
+        return params;
+    }
+
+    Node<Integer> fill(Integer[] array, NodeBuilder<Integer> nb) {
+        for (Integer i : array) {
+            nb.accept(i);
+        }
+        return nb;
+    }
+
+    Node<Integer> degenerateTree(Iterator<Integer> it) {
+        Integer i = it.next();
+        if (it.hasNext()) {
+            return Nodes.node(Nodes.node(new Integer[]{i}), degenerateTree(it));
+        }
+        else {
+            return Nodes.node(new Integer[]{i});
+        }
+    }
+
+    Node<Integer> tree(List<Integer> l, Mapper<List<Integer>, Node<Integer>> m) {
+        if (l.size() < 3) {
+            return m.map(l);
+        }
+        else {
+            return Nodes.node(tree(l.subList(0, l.size() / 2), m), tree(l.subList(l.size() / 2, l.size()), m));
+        }
+    }
+
+    @Test(dataProvider = "nodes")
+    public void testAsArray(Integer[] array, Node<Integer> n) {
+        assertEquals(n.asArray(), array);
+    }
+
+    @Test(dataProvider = "nodes")
+    public void testFlattenAsArray(Integer[] array, Node<Integer> n) {
+        assertEquals(n.flatten().asArray(), array);
+    }
+
+    @Test(dataProvider = "nodes")
+    public void testCopyTo(Integer[] array, Node<Integer> n) {
+        Integer[] copy = new Integer[n.size()];
+        n.copyInto(copy, 0);
+
+        assertEquals(copy, array);
+    }
+
+    @Test(dataProvider = "nodes")
+    public void testForEach(Integer[] array, Node<Integer> n) {
+        List<Integer> l = new ArrayList<>(n.size());
+        n.forEach(e -> {
+            l.add(e);
+        });
+
+        assertEquals(l.toArray(), array);
+    }
+
+    @Test(dataProvider = "nodes")
+    public void testIterator(Integer[] array, Node<Integer> n) {
+        List<Integer> l = new ArrayList<>(n.size());
+        Iterator<Integer> it = n.iterator();
+        while (it.hasNext()) {
+            l.add(it.next());
+        }
+
+        assertEquals(l.toArray(), array);
+    }
+
+    @Test(dataProvider = "nodes")
+    public void testSpliterator(Integer[] array, Node<Integer> n) {
+        List<Integer> l = new ArrayList<>(n.size());
+        split(l, n.spliterator());
+
+        assertEquals(l.toArray(), array);
+    }
+
+    void split(List<Integer> l, Spliterator<Integer> s) {
+        if (s.getNaturalSplits() == 0) {
+            Spliterator<Integer> _s = s.split();
+            assertEquals(_s.getNaturalSplits(), 0);
+            assertFalse(_s.split().iterator().hasNext());
+
+            s.forEach(e -> {
+                l.add(e);
+            });
+        }
+        else {
+            for (int i = 0; i < s.getNaturalSplits(); i++) {
+                split(l, s.split());
+            }
+            split(l, s);
+        }
+    }
+
+}
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/ToArrayOpTest.java	Fri Oct 19 17:27:49 2012 -0400
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/ToArrayOpTest.java	Sat Oct 20 13:44:10 2012 -0400
@@ -27,11 +27,11 @@
 import org.openjdk.tests.java.util.streams.StreamTestDataProvider;
 import org.testng.annotations.Test;
 
+import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.streams.ops.FlatMapOp;
-import java.util.streams.ops.IntermediateOp;
-import java.util.streams.ops.MapOp;
-import java.util.streams.ops.ToArrayOp;
+import java.util.List;
+import java.util.streams.ValuePipeline;
+import java.util.streams.ops.*;
 
 import static org.openjdk.tests.java.util.LambdaTestHelpers.*;
 
@@ -42,6 +42,7 @@
  */
 @Test
 public class ToArrayOpTest extends StreamOpTestCase {
+
     public void testRawIterator() {
         ToArrayOp<Integer> op = ToArrayOp.singleton();
         assertCountSum(Arrays.asList(iteratorToStatefulSink(countTo(0).iterator(), op.sink())), 0, 0);
@@ -77,4 +78,76 @@
                                        }));
         assertTrue(objects.length == data.size() * 2);
     }
+
+    @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
+    public void testOpsWithSorted(String name, TestData<Integer> data) {
+        // Retain the size of the source
+        // This should kick in the parallel evaluation optimization for tasks stuffing elements into a shared array
+        Object[] objects = exerciseOps(data, Arrays::equals, ToArrayOp.<Integer>singleton(),
+                                       new SortedOp<Integer>());
+        assertTrue(objects.length == data.size());
+    }
+
+    @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
+    public void testOpsWithCumulate(String name, TestData<Integer> data) {
+        // Retain the size of the source
+        // This should kick in the parallel evaluation optimization for tasks stuffing elements into a shared array
+        Object[] objects = exerciseOps(data, Arrays::equals, ToArrayOp.<Integer>singleton(),
+                                       new CumulateOp<Integer>((a, b) -> a + b));
+        assertTrue(objects.length == data.size());
+    }
+
+    @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
+    public void testWithNode(String name, TestData<Integer> data) {
+        List<Integer> l = new ArrayList<>();
+        for (Integer i : data) {
+            l.add(i);
+        }
+
+        {
+            Node<Integer> node = Nodes.node(l);
+            Object[] output = new ValuePipeline<>(Nodes.toStreamAccessor(node)).toArray();
+            assertEquals(Arrays.asList(output), l);
+        }
+
+        {
+            Node<Integer> node = Nodes.node(l.toArray(new Integer[l.size()]));
+            Object[] output = new ValuePipeline<>(Nodes.toStreamAccessor(node)).toArray();
+            assertEquals(Arrays.asList(output), l);
+        }
+
+        {
+            Node<Integer> node = tree(l);
+            Object[] output = new ValuePipeline<>(Nodes.toStreamAccessor(node)).toArray();
+            assertEquals(Arrays.asList(output), l);
+        }
+
+        {
+            NodeBuilder<Integer> node = Nodes.makeBuilder(l.size());
+            for (Integer i : l) {
+                node.accept(i);
+            }
+            Object[] output = new ValuePipeline<>(Nodes.toStreamAccessor(node)).toArray();
+            assertEquals(Arrays.asList(output), l);
+        }
+
+        {
+            NodeBuilder<Integer> node = Nodes.makeBuilder();
+            for (Integer i : l) {
+                node.accept(i);
+            }
+            Object[] output = new ValuePipeline<>(Nodes.toStreamAccessor(node)).toArray();
+            assertEquals(Arrays.asList(output), l);
+        }
+    }
+
+    Node<Integer> tree(List<Integer> l) {
+        if (l.size() < 3) {
+            return Nodes.node(l.toArray(new Integer[l.size()]));
+        }
+        else {
+            return Nodes.node(tree(l.subList(0, l.size() / 2)), tree(l.subList(l.size() / 2, l.size())));
+        }
+    }
+
 }
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/UniqOpTest.java	Fri Oct 19 17:27:49 2012 -0400
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/UniqOpTest.java	Sat Oct 20 13:44:10 2012 -0400
@@ -28,6 +28,8 @@
 import org.openjdk.tests.java.util.streams.StreamTestDataProvider;
 import org.testng.annotations.Test;
 
+import java.util.streams.ops.MapOp;
+import java.util.streams.ops.SortedOp;
 import java.util.streams.ops.UniqOp;
 
 import static org.openjdk.tests.java.util.LambdaTestHelpers.*;
@@ -55,7 +57,7 @@
     }
 
     @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
-    public void testOps(String name, TestData<Integer> data) {
+    public void testOp(String name, TestData<Integer> data) {
         UniqOp<Integer> op = UniqOp.singleton();
 
         StreamResult<Integer> result = StreamOpTestCase.<Integer, Integer>testUsingData(data).
@@ -70,5 +72,17 @@
         assertTrue(result.size() <= data.size());
     }
 
+    @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
+    public void testUniqSort(String name, TestData<Integer> data) {
+        StreamResult<Integer> result = StreamOpTestCase.<Integer, Integer>testUsingData(data).
+                parallelEqualator(LambdaTestHelpers::equalsContentsUnordered).
+                excerciseOps(UniqOp.singleton(), new SortedOp<>());
+    }
 
+    @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
+    public void testSortUniq(String name, TestData<Integer> data) {
+        StreamResult<Integer> result = StreamOpTestCase.<Integer, Integer>testUsingData(data).
+                parallelEqualator(LambdaTestHelpers::equalsContentsUnordered).
+                excerciseOps(new SortedOp<>(), UniqOp.singleton());
+    }
 }