changeset 6075:6e0ca4cca828

Replace ParallelOp with EvaluableOp for sequential and parallel evaluation. Remove ParallelPipelineHelper.suggestedDepth as AbstractPipeline should, in the future, decide whether to evaluate sequentially or in parallel.
author psandoz
date Mon, 15 Oct 2012 15:06:03 -0700
parents 994010241295
children 9ee3890658f9
files src/share/classes/java/util/streams/AbstractPipeline.java src/share/classes/java/util/streams/ParallelPipelineHelper.java src/share/classes/java/util/streams/PipelineHelper.java src/share/classes/java/util/streams/ops/AbstractTask.java src/share/classes/java/util/streams/ops/ConcatOp.java src/share/classes/java/util/streams/ops/CumulateOp.java src/share/classes/java/util/streams/ops/EvaluableOp.java src/share/classes/java/util/streams/ops/FindAnyOp.java src/share/classes/java/util/streams/ops/FindFirstOp.java src/share/classes/java/util/streams/ops/FoldOp.java src/share/classes/java/util/streams/ops/ForEachOp.java src/share/classes/java/util/streams/ops/GroupByOp.java src/share/classes/java/util/streams/ops/LimitOp.java src/share/classes/java/util/streams/ops/MapLimitOp.java src/share/classes/java/util/streams/ops/MatchOp.java src/share/classes/java/util/streams/ops/ParallelOp.java src/share/classes/java/util/streams/ops/ReduceByOp.java src/share/classes/java/util/streams/ops/StatefulOp.java src/share/classes/java/util/streams/ops/TerminalOp.java src/share/classes/java/util/streams/ops/ToArrayOp.java src/share/classes/java/util/streams/ops/TreeUtils.java
diffstat 21 files changed, 215 insertions(+), 256 deletions(-) [+]
line wrap: on
line diff
--- a/src/share/classes/java/util/streams/AbstractPipeline.java	Mon Oct 15 14:05:16 2012 -0700
+++ b/src/share/classes/java/util/streams/AbstractPipeline.java	Mon Oct 15 15:06:03 2012 -0700
@@ -84,13 +84,19 @@
     }
 
     protected<R> R evaluate(TerminalOp<E_OUT, R> terminal) {
-        return source.isParallel() ? evaluateParallel(terminal) : evaluateSerial(terminal);
+        // @@@ If the source size estimate is small, don't bother going parallel
+        if (source.isParallel()) {
+            return evaluateParallel(terminal);
+        }
+        else
+            return evaluateSerial(terminal);
     }
 
     protected<R> R evaluateParallel(TerminalOp<E_OUT, R> terminal) {
         // @@@ Need to check if any upstream streams have been pulled using iterator
         if (iterator != null) {
             // @@@ Is this assumption correct for all sources and pipelines?
+            // @@@ Can default to serial evaluation
             throw new IllegalStateException("A stream that has been iterated on (partially or otherwise) cannot be evaluated in parallel");
         }
 
@@ -117,36 +123,39 @@
 
     private <R> TreeUtils.Node<R> evaluateParallel(StreamAccessor source, IntermediateOp[] ops, int from, int to,
                                                    StatefulOp<?, R> terminal) {
-        return (TreeUtils.Node<R>) terminal.evaluateParallel(source, new ParallelImplPipelineHelper(source, ops, from, to));
+        return (TreeUtils.Node<R>) terminal.evaluateParallel(new ParallelImplPipelineHelper(source, ops, from, to));
     }
 
     private <R> R evaluateParallel(StreamAccessor<?> source, IntermediateOp[] ops, int from, int to,
                                    TerminalOp<E_OUT, R> terminal) {
-        return (R) terminal.evaluateParallel(source, new ParallelImplPipelineHelper(source, ops, from, to));
+        return (R) terminal.evaluateParallel(new ParallelImplPipelineHelper(source, ops, from, to));
     }
 
     protected <R> R evaluateSerial(TerminalOp<E_OUT, R> terminal) {
-        return (R) terminal.evaluateSequential(source, new SequentialImplPipelineHelper(terminal));
+        return (R) terminal.evaluateSequential(new SequentialImplPipelineHelper(source, terminal));
     }
 
-    static class AbstractPipelineHelper<P_IN, P_OUT> {
+    static abstract class AbstractPipelineHelper<P_IN, P_OUT> implements PipelineHelper<P_IN, P_OUT> {
+        final StreamAccessor<P_IN> source;
         final IntermediateOp[] ops;
         final int from;
         final int to;
         final boolean isIntermediateShortCircuit;
         final int flags;
 
-        AbstractPipelineHelper(int flags, IntermediateOp[] ops) {
-            this(flags, ops, 0, ops.length);
+        AbstractPipelineHelper(StreamAccessor<P_IN> source, IntermediateOp[] ops) {
+            this(source, ops, 0, ops.length);
         }
 
-        AbstractPipelineHelper(int flags, IntermediateOp[] ops, int from, int to) {
+        AbstractPipelineHelper(StreamAccessor<P_IN> source, IntermediateOp[] ops, int from, int to) {
             boolean isIntermediateShortCircuit = false;
+            int flags = source.getStreamFlags();
             for (int i = from; i < to; i++) {
                 isIntermediateShortCircuit |= ops[i].isShortCircuit();
                 flags = ops[i].getStreamFlags(flags);
             }
 
+            this.source = source;
             this.ops = ops;
             this.from = from;
             this.to = to;
@@ -158,6 +167,12 @@
             return flags;
         }
 
+        @Override
+        public StreamAccessor<P_IN> getStreamAccessor() {
+            return source;
+        }
+
+        @Override
         public Sink<P_IN, ?, ?> wrapSink(Sink sink) {
             Objects.requireNonNull(sink);
 
@@ -167,6 +182,7 @@
             return sink;
         }
 
+        @Override
         public Iterator<P_OUT> wrapIterator(Iterator it) {
             Objects.requireNonNull(it);
 
@@ -177,10 +193,10 @@
         }
     }
 
-    class SequentialImplPipelineHelper<P_IN> extends AbstractPipelineHelper<P_IN, E_OUT> implements PipelineHelper<P_IN, E_OUT> {
+    class SequentialImplPipelineHelper<P_IN> extends AbstractPipelineHelper<P_IN, E_OUT> {
 
-        <R> SequentialImplPipelineHelper(TerminalOp<E_OUT, R> terminal) {
-            super(source.getStreamFlags(), ops());
+        <R> SequentialImplPipelineHelper(StreamAccessor<P_IN> source, TerminalOp<E_OUT, R> terminal) {
+            super(source, ops());
 
             // @@@ Other potential IllegalStateException
             // @@@ If an infinite stream is input to a stateful intermediate op that is required to operate on all
@@ -194,21 +210,14 @@
         }
 
         @Override
-        public void into(StreamAccessor<P_IN> sa, Sink<E_OUT, ?, ?> sink) {
-            Objects.requireNonNull(sa);
+        public void into(Sink<E_OUT, ?, ?> sink) {
             Objects.requireNonNull(sink);
 
-            if (sa == source) {
-                // @@@ Need to check if any upstream streams have been pulled using iterator
-                if (isIntermediateShortCircuit || iterator != null) {
-                    into(AbstractPipeline.this.iterator(), sink);
-                }  else {
-                    sa.into(wrapSink(sink));
-                }
-            } else if (isIntermediateShortCircuit) {
-                into(wrapIterator(sa.iterator()), sink);
-            } else {
-                sa.into(wrapSink(sink));
+            // @@@ Need to check if any upstream streams have been pulled using iterator
+            if (isIntermediateShortCircuit || iterator != null) {
+                into(AbstractPipeline.this.iterator(), sink);
+            }  else {
+                getStreamAccessor().into(wrapSink(sink));
             }
         }
 
@@ -220,37 +229,19 @@
         }
 
         @Override
-        public Iterator<E_OUT> iterator(StreamAccessor<P_IN> sa) {
-            Objects.requireNonNull(sa);
-
-            if (sa == source) {
-                return AbstractPipeline.this.iterator();
-            } else {
-                return wrapIterator(sa.iterator());
-            }
+        public Iterator<E_OUT> iterator() {
+            return AbstractPipeline.this.iterator();
         }
     }
 
     class ParallelImplPipelineHelper<P_IN> extends AbstractPipelineHelper<P_IN, E_OUT> implements ParallelPipelineHelper<P_IN, E_OUT> {
-        final StreamAccessor<P_IN> source;
-
         ParallelImplPipelineHelper(StreamAccessor<P_IN> source, IntermediateOp[] ops, int from, int to) {
-            super(source.getStreamFlags(), ops, from, to);
-
-            this.source = source;
+            super(source, ops, from, to);
         }
 
         //
 
         @Override
-        public int suggestDepth() {
-            int estimate = source.estimateSize();
-            return estimate >= 0
-                   ? ForkJoinUtils.suggestDepth(estimate)
-                   : Integer.highestOneBit(ForkJoinUtils.defaultFJPool().getParallelism()) + 1;
-        }
-
-        @Override
         public long suggestTargetSize() {
             int estimate = source.estimateSize();
             return estimate >= 0
@@ -262,6 +253,16 @@
         public <FJ_R> FJ_R invoke(ForkJoinTask<FJ_R> task) {
             return ForkJoinUtils.defaultFJPool().invoke(task);
         }
+
+        @Override
+        public Iterator<E_OUT> iterator() {
+            return wrapIterator(source.iterator());
+        }
+
+        @Override
+        public Spliterator<P_IN> spliterator() {
+            return source.spliterator();
+        }
     }
 
     private IntermediateOp[] ops() {
--- a/src/share/classes/java/util/streams/ParallelPipelineHelper.java	Mon Oct 15 14:05:16 2012 -0700
+++ b/src/share/classes/java/util/streams/ParallelPipelineHelper.java	Mon Oct 15 15:06:03 2012 -0700
@@ -34,30 +34,17 @@
  */
 public interface ParallelPipelineHelper<P_IN, P_OUT> extends PipelineHelper<P_IN, P_OUT> {
 
-    /**
-     * // @@@ Should this take StreamAccessor as an argument?
-     * //     Currently the PipelineHelper is tied to the StreamAccessor
-     */
-    int suggestDepth();
-
-    /**
-     * // @@@ Should this take StreamAccessor as an argument?
-     * //     Currently the PipelineHelper is tied to the StreamAccessor
-     */
     long suggestTargetSize();
 
-    // @@@ If/when Spliterator/StreamAccessor share a common type this will go away
     <R> R evaluateSequential(Spliterator<P_IN> sp, TerminalSink<P_OUT, R> sink) default {
         into(sp, sink);
         return sink.getAndClearState();
     }
 
-    // @@@ If/when Spliterator/StreamAccessor share a common type this will go away
     void into(Spliterator<P_IN> sp, Sink<P_OUT, ?, ?> sink) default {
         sp.into(wrapSink(sink));
     }
 
-    // @@@ If/when Spliterator/StreamAccessor share a common type this will go away
     Iterator<P_OUT> iterator(Spliterator<P_IN> sp) default {
         return wrapIterator(sp.iterator());
     }
@@ -70,4 +57,6 @@
      * @return the fork/join result
      */
     <FJ_R> FJ_R invoke(ForkJoinTask<FJ_R> task);
+
+    Spliterator<P_IN> spliterator();
 }
--- a/src/share/classes/java/util/streams/PipelineHelper.java	Mon Oct 15 14:05:16 2012 -0700
+++ b/src/share/classes/java/util/streams/PipelineHelper.java	Mon Oct 15 15:06:03 2012 -0700
@@ -34,10 +34,6 @@
 public interface PipelineHelper<P_IN, P_OUT> {
 
     /**
-     * // @@@ Should this take StreamAccessor as an argument?
-     * //     Currently the PipelineHelper is tied to the StreamAccessor
-     * // @@@ If the evaluation of flags is changed it might be possible to compose
-     *        the stream flags with the aggregated ops flags
      *
      * @return the resultant flags of all source and the operations.
      */
@@ -46,34 +42,22 @@
     /**
      * Evaluate the pipeline and return the result of evaluation.
      *
-     * @param sa the source of elements.
      * @param sink the terminal sink, the last in the sink chain that produces a result.
      * @param <R> the result type.
      * @return the result.
      */
-    <R> R evaluateSequential(StreamAccessor<P_IN> sa, TerminalSink<P_OUT, R> sink) default {
-        into(sa, sink);
+    <R> R evaluateSequential(TerminalSink<P_OUT, R> sink) default {
+        into(sink);
         return sink.getAndClearState();
     }
 
     /**
      * Create a sink chain and push elements from the source into that chain.
      *
-     * @param sa the source of elements input to the sink chain.
      * @param sink the sink that is the last in the sink chain.
      */
-    void into(StreamAccessor<P_IN> sa, Sink<P_OUT, ?, ?> sink) default {
-        sa.into(wrapSink(sink));
-    }
-
-    /**
-     * Create an iterator chain.
-     *
-     * @param sa the source of the first iterator in the chain.
-     * @return the last iterator in the chain.
-     */
-    Iterator<P_OUT> iterator(StreamAccessor<P_IN> sa) default {
-        return wrapIterator(sa.iterator());
+    void into(Sink<P_OUT, ?, ?> sink) default {
+        getStreamAccessor().into(wrapSink(sink));
     }
 
     /**
@@ -85,10 +69,23 @@
     Sink<P_IN, ?, ?> wrapSink(Sink<P_OUT, ?, ?> sink);
 
     /**
-     * Create an interator chain.
+     * Create an iterator chain.
      *
      * @param source the first iterator in the chain that iterates over input elements.
      * @return the last iterator in the chain that iterates over output elements.
      */
     Iterator<P_OUT> wrapIterator(Iterator<P_IN> source);
+
+    /**
+     * Create an iterator chain, starting with the stream source, and wrapping with each stage
+     *
+     * @return the last iterator in the chain.
+     */
+    Iterator<P_OUT> iterator();
+
+    /**
+     *
+     * @return the associated stream accessor.
+     */
+    StreamAccessor<P_IN> getStreamAccessor();
 }
--- a/src/share/classes/java/util/streams/ops/AbstractTask.java	Mon Oct 15 14:05:16 2012 -0700
+++ b/src/share/classes/java/util/streams/ops/AbstractTask.java	Mon Oct 15 15:06:03 2012 -0700
@@ -25,6 +25,7 @@
 package java.util.streams.ops;
 
 import java.util.concurrent.CountedCompleter;
+import java.util.streams.ParallelPipelineHelper;
 import java.util.streams.Spliterator;
 
 /**
@@ -46,10 +47,10 @@
     protected boolean isLeaf;
     private R rawResult;
 
-    protected AbstractTask(Spliterator<E> spliterator, long targetSize) {
+    protected AbstractTask(ParallelPipelineHelper<E, ?> helper) {
         super(null);
-        this.spliterator = spliterator;
-        this.targetSize = targetSize;
+        this.spliterator = helper.spliterator();
+        this.targetSize = helper.suggestTargetSize();
     }
 
     protected AbstractTask(T parent, Spliterator<E> spliterator) {
@@ -124,8 +125,8 @@
 abstract class ComparableTask<E, R, T extends ComparableTask<E,R,T>> extends AbstractTask<E,R,T> implements Comparable<T> {
     protected final int depth;
 
-    protected ComparableTask(Spliterator<E> spliterator, long targetSize) {
-        super(spliterator, targetSize);
+    protected ComparableTask(ParallelPipelineHelper<E, ?> helper) {
+        super(helper);
         depth = 0;
     }
 
--- a/src/share/classes/java/util/streams/ops/ConcatOp.java	Mon Oct 15 14:05:16 2012 -0700
+++ b/src/share/classes/java/util/streams/ops/ConcatOp.java	Mon Oct 15 15:06:03 2012 -0700
@@ -57,9 +57,9 @@
     }
 
     @Override
-    public <S> TreeUtils.Node<T> evaluateParallel(StreamAccessor<S> source, ParallelPipelineHelper<S, T> helper) {
+    public <S> TreeUtils.Node<T> evaluateParallel(ParallelPipelineHelper<S, T> helper) {
         // Get all stuff from upstream
-        TreeUtils.Node<T> upStreamNode = TreeUtils.collect(source.spliterator(), helper, false, false);
+        TreeUtils.Node<T> upStreamNode = TreeUtils.collect(helper, false);
 
         // Get stuff from concatenation
         TreeUtils.Node<T> concatStreamNode = computeParallelFromConcatenatingStream();
--- a/src/share/classes/java/util/streams/ops/CumulateOp.java	Mon Oct 15 14:05:16 2012 -0700
+++ b/src/share/classes/java/util/streams/ops/CumulateOp.java	Mon Oct 15 15:06:03 2012 -0700
@@ -113,18 +113,18 @@
     }
 
     @Override
-    public <S> TreeUtils.Node<T> evaluateParallel(StreamAccessor<S> source, ParallelPipelineHelper<S, T> helper) {
-        return helper.invoke(new CumulateTask<>(helper.suggestDepth(), source.spliterator(), op, helper));
+    public <S> TreeUtils.Node<T> evaluateParallel(ParallelPipelineHelper<S, T> helper) {
+        return helper.invoke(new CumulateTask<>(helper.spliterator(), op, helper));
     }
 
     private static class Problem<S, T> {
-        final int depth;
+        final long targetSize;
         final BinaryOperator<T> op;
         final ParallelPipelineHelper<S, T> helper;
         int pass = 0;
 
-        private Problem(int depth, BinaryOperator<T> op, ParallelPipelineHelper<S, T> helper) {
-            this.depth = depth;
+        private Problem(BinaryOperator<T> op, ParallelPipelineHelper<S, T> helper) {
+            this.targetSize = helper.suggestTargetSize();
             this.op = op;
             this.helper = helper;
         }
@@ -132,46 +132,45 @@
 
     private class CumulateTask<S> extends RecursiveTask<TreeUtils.Node<T>> {
         private final Problem<S, T> problem;
-        private final int depth;
-        private final Spliterator<S> source;
+        private final Spliterator<S> spliterator;
+        private final boolean isRoot;
         private CumulateTask<S> left, right;
         private StreamBuilder<T> leafData;
+        private boolean isLeaf;
         private T upward;
         private T downward;
         private boolean downwardZero = false;
         private StreamBuilder<T> result;
 
-        private CumulateTask(int depth,
-                             Spliterator<S> source,
+        private CumulateTask(Spliterator<S> spliterator,
                              BinaryOperator<T> op,
                              ParallelPipelineHelper<S, T> helper) {
-            this.depth = depth;
-            this.source = source;
-            this.problem = new Problem<>(depth, op, helper);
+            this.spliterator = spliterator;
+            this.problem = new Problem<>(op, helper);
+            this.isRoot = true;
         }
 
-        private CumulateTask(int depth, Spliterator<S> source, Problem<S, T> problem) {
-            this.depth = depth;
-            this.source = source;
+        private CumulateTask(Spliterator<S> spliterator, Problem<S, T> problem) {
+            this.spliterator = spliterator;
             this.problem = problem;
-        }
-
-        public boolean isRoot() {
-            return depth == problem.depth;
+            this.isRoot = false;
         }
 
         @Override
         protected TreeUtils.Node<T> compute() {
             switch (problem.pass) {
                 case 0:
-                    if (depth != 0) {
-                        left = new CumulateTask<>(depth - 1, source.split(), problem);
-                        right = new CumulateTask<>(depth - 1, source, problem);
+                    int remaining = spliterator.estimateSize();
+                    int naturalSplits = spliterator.getNaturalSplits();
+                    isLeaf = ((remaining <= problem.targetSize) && (remaining >= 0)) || (naturalSplits == 0);
+                    if (!isLeaf) {
+                        left = new CumulateTask<>(spliterator.split(), problem);
+                        right = new CumulateTask<>(spliterator, problem);
                         right.fork();
                         left.compute();
                         right.join();
                         upward = problem.op.operate(left.upward, right.upward);
-                        if (isRoot()) {
+                        if (isRoot) {
                             downwardZero = true;
                             problem.pass = 1;
                             return compute();
@@ -180,16 +179,16 @@
                     else {
                         leafData = StreamBuilders.make();
                         TerminalSink<T, T> terminalSink = wrapSink(leafData);
-                        problem.helper.into(source, terminalSink);
+                        problem.helper.into(spliterator, terminalSink);
                         upward = terminalSink.getAndClearState();
                         // Special case -- if problem.depth == 0, just wrap the result and be done
-                        if (isRoot())
+                        if (isRoot)
                             return TreeUtils.node(leafData);
                     }
                     return null;
 
                 case 1:
-                    if (depth != 0) {
+                    if (!isLeaf) {
                         left.reinitialize();
                         right.reinitialize();
                         if (downwardZero) {
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/share/classes/java/util/streams/ops/EvaluableOp.java	Mon Oct 15 15:06:03 2012 -0700
@@ -0,0 +1,61 @@
+/*
+ * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package java.util.streams.ops;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.streams.ParallelPipelineHelper;
+import java.util.streams.PipelineHelper;
+import java.util.streams.StreamAccessor;
+
+/**
+ * An Op whose results can be evaluated directly
+ *
+ * @param <E_IN> Type of input elements.
+ * @param <R> Type of result.
+ * @author Brian Goetz
+ */
+public interface EvaluableOp<E_IN, R> {
+
+    /**
+     * Evaluate the result of the operation in parallel.
+     *
+     * @param helper
+     * @param <P_IN> Type of elements input to the pipeline.
+     * @return the result of the operation.
+     */
+    <P_IN> R evaluateParallel(ParallelPipelineHelper<P_IN, E_IN> helper) default {
+        Logger.getLogger(getClass().getName()).log(Level.WARNING, "{0} using computeParallel serial default", getClass().getSimpleName());
+        return evaluateSequential(helper);
+    }
+
+    /**
+     * Evaluate the result of the operation sequentially.
+     *
+     * @param helper
+     * @return the result of the operation.
+     */
+    <P_IN> R evaluateSequential(PipelineHelper<P_IN, E_IN> helper);
+}
--- a/src/share/classes/java/util/streams/ops/FindAnyOp.java	Mon Oct 15 14:05:16 2012 -0700
+++ b/src/share/classes/java/util/streams/ops/FindAnyOp.java	Mon Oct 15 15:06:03 2012 -0700
@@ -48,8 +48,8 @@
     }
 
     @Override
-    public <S> Optional<T> evaluateSequential(StreamAccessor<S> source, PipelineHelper<S, T> helper) {
-        return evaluate(helper.iterator(source));
+    public <S> Optional<T> evaluateSequential(PipelineHelper<S, T> helper) {
+        return evaluate(helper.iterator());
     }
 
     // For testing purposes
--- a/src/share/classes/java/util/streams/ops/FindFirstOp.java	Mon Oct 15 14:05:16 2012 -0700
+++ b/src/share/classes/java/util/streams/ops/FindFirstOp.java	Mon Oct 15 15:06:03 2012 -0700
@@ -48,8 +48,8 @@
     }
 
     @Override
-    public <S> Optional<T> evaluateSequential(StreamAccessor<S> source, PipelineHelper<S, T> helper) {
-        return evaluate(helper.iterator(source));
+    public <S> Optional<T> evaluateSequential(PipelineHelper<S, T> helper) {
+        return evaluate(helper.iterator());
     }
 
     // For testing purposes
--- a/src/share/classes/java/util/streams/ops/FoldOp.java	Mon Oct 15 14:05:16 2012 -0700
+++ b/src/share/classes/java/util/streams/ops/FoldOp.java	Mon Oct 15 15:06:03 2012 -0700
@@ -50,8 +50,8 @@
         this(()->seed, reducer, combiner);
     }
 
-    public <S> U evaluateSequential(StreamAccessor<S> source, PipelineHelper<S, T> helper) {
-        return helper.evaluateSequential(source, sink());
+    public <S> U evaluateSequential(PipelineHelper<S, T> helper) {
+        return helper.evaluateSequential(sink());
     }
 
     protected TerminalSink<T, U> sink() {
@@ -81,8 +81,8 @@
     }
 
     @Override
-    public <S> U evaluateParallel(StreamAccessor<S> source, ParallelPipelineHelper<S, T> helper) {
-        ReduceTask<S, T, U> task = new ReduceTask<>(source.spliterator(), helper.suggestTargetSize(), helper, this);
+    public <S> U evaluateParallel(ParallelPipelineHelper<S, T> helper) {
+        ReduceTask<S, T, U> task = new ReduceTask<>(helper, this);
         helper.invoke(task);
         return task.getRawResult();
     }
@@ -91,8 +91,8 @@
         private final ParallelPipelineHelper<S, T> helper;
         private final FoldOp<T,U> op;
 
-        private ReduceTask(Spliterator<S> spliterator, long targetSize, ParallelPipelineHelper<S, T> helper, FoldOp<T, U> op) {
-            super(spliterator, targetSize);
+        private ReduceTask(ParallelPipelineHelper<S, T> helper, FoldOp<T, U> op) {
+            super(helper);
             this.helper = helper;
             this.op = op;
         }
--- a/src/share/classes/java/util/streams/ops/ForEachOp.java	Mon Oct 15 14:05:16 2012 -0700
+++ b/src/share/classes/java/util/streams/ops/ForEachOp.java	Mon Oct 15 15:06:03 2012 -0700
@@ -88,28 +88,21 @@
     }
 
     @Override
-    public <S> Void evaluateSequential(StreamAccessor<S> source, PipelineHelper<S, T> helper) {
-        return helper.evaluateSequential(source, sink);
+    public <S> Void evaluateSequential(PipelineHelper<S, T> helper) {
+        return helper.evaluateSequential(sink);
     }
 
     @Override
-    public <S> Void evaluateParallel(StreamAccessor<S> source, ParallelPipelineHelper<S, T> helper) {
-        Spliterator<S> spliterator = source.spliterator();
-        Sink<S, ?, ?> compoundSink = helper.wrapSink(sink);
-        if (helper.suggestDepth() == 0) {
-            spliterator.into(compoundSink);
-        } else {
-            helper.invoke(new ForEachTask<>(spliterator, helper.suggestTargetSize(), compoundSink));
-        }
-        return null;
+    public <S> Void evaluateParallel(ParallelPipelineHelper<S, T> helper) {
+        return helper.invoke(new ForEachTask<>(helper, helper.wrapSink(sink)));
     }
 
     // @@@ Extending AbstractTask here is probably inefficient, since we don't really need to keep track of the structure of the computation tree
     private static class ForEachTask<S> extends AbstractTask<S, Void, ForEachTask<S>> {
         private final Sink<S, ?, ?> sink;
 
-        private ForEachTask(Spliterator<S> spliterator, long targetSize, Sink<S, ?, ?> sink) {
-            super(spliterator, targetSize);
+        private ForEachTask(ParallelPipelineHelper<S, ?> helper, Sink<S, ?, ?> sink) {
+            super(helper);
             this.sink = sink;
         }
 
--- a/src/share/classes/java/util/streams/ops/GroupByOp.java	Mon Oct 15 14:05:16 2012 -0700
+++ b/src/share/classes/java/util/streams/ops/GroupByOp.java	Mon Oct 15 15:06:03 2012 -0700
@@ -29,8 +29,6 @@
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountedCompleter;
-import java.util.functions.Factory;
 import java.util.functions.Mapper;
 import java.util.streams.*;
 
@@ -85,13 +83,13 @@
     }
 
     @Override
-    public <S> Map<K, Collection<T>> evaluateSequential(StreamAccessor<S> source, PipelineHelper<S, T> helper) {
-        return helper.evaluateSequential(source, sink());
+    public <S> Map<K, Collection<T>> evaluateSequential(PipelineHelper<S, T> helper) {
+        return helper.evaluateSequential(sink());
     }
 
     @Override
-    public <S> Map<K, Collection<T>> evaluateParallel(StreamAccessor<S> source, ParallelPipelineHelper<S, T> helper) {
-        final ConcurrentHashMap<K, StreamBuilder<T>> map = new ConcurrentHashMap();
+    public <S> Map<K, Collection<T>> evaluateParallel(ParallelPipelineHelper<S, T> helper) {
+        final ConcurrentHashMap<K, StreamBuilder<T>> map = new ConcurrentHashMap<>();
 
         // Cache the sink chain, so it can be reused by all F/J leaf tasks
         Sink<S, ?, ?> sinkChain = helper.wrapSink(new Sink.OfValue<T>() {
@@ -105,7 +103,7 @@
             }
         });
 
-        GroupByTask<S, T> task = new GroupByTask<>(source.spliterator(), helper, sinkChain);
+        GroupByTask<S, T> task = new GroupByTask<>(helper, sinkChain);
         helper.invoke(task);
 
         // @@@ Fragile cast, need a better way to switch StreamBuilder into Collection
@@ -119,8 +117,8 @@
 
         private final Sink<S, ?, ?> sinkChain;
 
-        private GroupByTask(Spliterator<S> spliterator, ParallelPipelineHelper<S, T> helper, Sink<S, ?, ?> sinkChain) {
-            super(spliterator, helper.suggestTargetSize());
+        private GroupByTask(ParallelPipelineHelper<S, T> helper, Sink<S, ?, ?> sinkChain) {
+            super(helper);
             this.helper = helper;
             this.sinkChain = sinkChain;
         }
--- a/src/share/classes/java/util/streams/ops/LimitOp.java	Mon Oct 15 14:05:16 2012 -0700
+++ b/src/share/classes/java/util/streams/ops/LimitOp.java	Mon Oct 15 15:06:03 2012 -0700
@@ -65,9 +65,9 @@
     }
 
     @Override
-    public <S> TreeUtils.Node<T> evaluateParallel(StreamAccessor<S> source, ParallelPipelineHelper<S, T> helper) {
+    public <S> TreeUtils.Node<T> evaluateParallel(ParallelPipelineHelper<S, T> helper) {
         // Dumb serial implementation defering to iterator
-        final Iterator<T> i = wrapIterator(helper.iterator(source));
+        final Iterator<T> i = wrapIterator(helper.iterator());
 
         // @@@ if limit is small enough can use fixed size builder
         final StreamBuilder<T> sb = StreamBuilders.make();
--- a/src/share/classes/java/util/streams/ops/MapLimitOp.java	Mon Oct 15 14:05:16 2012 -0700
+++ b/src/share/classes/java/util/streams/ops/MapLimitOp.java	Mon Oct 15 15:06:03 2012 -0700
@@ -61,9 +61,9 @@
     }
 
     @Override
-    public <S> TreeUtils.Node<Mapping<K, V>> evaluateParallel(StreamAccessor<S> source, ParallelPipelineHelper<S, Mapping<K, V>> helper) {
+    public <S> TreeUtils.Node<Mapping<K, V>> evaluateParallel(ParallelPipelineHelper<S, Mapping<K, V>> helper) {
         // Dumb serial implementation defering to iterator
-        final MapIterator<K, V> i = wrapIterator(helper.iterator(source));
+        final MapIterator<K, V> i = (MapIterator<K,V>) helper.iterator();
 
         // @@@ if limit is small enough can use fixed size builder
         final StreamBuilder<Mapping<K, V>> sb = StreamBuilders.make();
--- a/src/share/classes/java/util/streams/ops/MatchOp.java	Mon Oct 15 14:05:16 2012 -0700
+++ b/src/share/classes/java/util/streams/ops/MatchOp.java	Mon Oct 15 15:06:03 2012 -0700
@@ -70,8 +70,8 @@
     }
 
     @Override
-    public <S> Boolean evaluateSequential(StreamAccessor<S> source, PipelineHelper<S, T> helper) {
-        return evaluate(helper.iterator(source));
+    public <S> Boolean evaluateSequential(PipelineHelper<S, T> helper) {
+        return evaluate(helper.iterator());
     }
 
     private Boolean evaluate(Iterator<T> iterator) {
@@ -79,14 +79,14 @@
     }
 
     @Override
-    public <S> Boolean evaluateParallel(StreamAccessor<S> source, ParallelPipelineHelper<S, T> helper) {
+    public <S> Boolean evaluateParallel(ParallelPipelineHelper<S, T> helper) {
         // Approach for parallel implementation:
         // - Decompose as per usual
         // - run match on leaf chunks, call result "b"
         // - if b == matchKind.shortCircuitOn, complete early and return b
         // - else if we complete normally, return !shortCircuitOn
 
-        MatchTask<S, T> task = new MatchTask<>(source.spliterator(), helper.suggestTargetSize(), this, helper);
+        MatchTask<S, T> task = new MatchTask<>(this, helper);
         helper.invoke(task);
         return task.answer.get();
     }
@@ -96,8 +96,8 @@
         private final ParallelPipelineHelper<S, T> helper;
         private final AtomicReference<Boolean> answer;
 
-        private MatchTask(Spliterator<S> spliterator, long targetSize, MatchOp<T> op, ParallelPipelineHelper<S, T> helper) {
-            super(spliterator, targetSize);
+        private MatchTask(MatchOp<T> op, ParallelPipelineHelper<S, T> helper) {
+            super(helper);
             this.op = op;
             this.helper = helper;
             this.answer = new AtomicReference<>(null);
--- a/src/share/classes/java/util/streams/ops/ParallelOp.java	Mon Oct 15 14:05:16 2012 -0700
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,54 +0,0 @@
-/*
- * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.  Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-package java.util.streams.ops;
-
-import java.util.streams.ParallelPipelineHelper;
-import java.util.streams.StreamAccessor;
-
-/**
- * Parallel operation.
- *
- * @param <E_IN> Type of input elements.
- * @param <R> Type of result.
- * @author Brian Goetz
- */
-public interface ParallelOp<E_IN, R> {
-
-    /**
-     * Compute the result of the operation in parallel and return the result.
-     *
-     * @param helper
-     * @param <P_IN> Type of elements input to the pipeline.
-     * @return result of the operation.
-     */
-
-    /**
-     * Evaluate the result of the operation in parallel.
-     *
-     * @param <P_IN> Type of elements input to the pipeline.
-     * @return the result of the operation.
-     */
-    <P_IN> R evaluateParallel(StreamAccessor<P_IN> source, ParallelPipelineHelper<P_IN, E_IN> helper);
-}
--- a/src/share/classes/java/util/streams/ops/ReduceByOp.java	Mon Oct 15 14:05:16 2012 -0700
+++ b/src/share/classes/java/util/streams/ops/ReduceByOp.java	Mon Oct 15 15:06:03 2012 -0700
@@ -80,7 +80,7 @@
     }
 
     @Override
-    public <S> Map<U, W> evaluateSequential(StreamAccessor<S> source, PipelineHelper<S, T> helper) {
-        return helper.evaluateSequential(source, sink());
+    public <S> Map<U, W> evaluateSequential(PipelineHelper<S, T> helper) {
+        return helper.evaluateSequential(sink());
     }
 }
--- a/src/share/classes/java/util/streams/ops/StatefulOp.java	Mon Oct 15 14:05:16 2012 -0700
+++ b/src/share/classes/java/util/streams/ops/StatefulOp.java	Mon Oct 15 15:06:03 2012 -0700
@@ -1,7 +1,5 @@
 package java.util.streams.ops;
 
-import java.util.logging.Level;
-import java.util.logging.Logger;
 import java.util.streams.*;
 
 /**
@@ -12,8 +10,8 @@
  * @param <E_OUT> Type of output elements.
  *
  */
-public interface StatefulOp<E_IN, E_OUT> extends IntermediateOp<E_IN, E_OUT>, ParallelOp<E_IN, TreeUtils.Node<E_OUT>> {
-    static final Logger LOGGER = Logger.getLogger(StatefulOp.class.getName());
+public interface StatefulOp<E_IN, E_OUT>
+        extends IntermediateOp<E_IN, E_OUT>, EvaluableOp<E_IN, TreeUtils.Node<E_OUT>> {
 
     @Override
     public boolean isStateful() default {
@@ -21,12 +19,9 @@
     }
 
     @Override
-    <P_IN> TreeUtils.Node<E_OUT> evaluateParallel(StreamAccessor<P_IN> source, ParallelPipelineHelper<P_IN, E_IN> helper) default {
-       LOGGER.log(Level.WARNING, "{0} using StatefulOp.evaluateParallel sequential default", getClass().getSimpleName());
-
-        // dumb default serial implementation
+    <P_IN> TreeUtils.Node<E_OUT> evaluateSequential(PipelineHelper<P_IN, E_IN> helper) default {
         final StreamBuilder<E_OUT> sb = StreamBuilders.make();
-        helper.into(source, wrapSink(sb));
+        helper.into(wrapSink(sb));
         return TreeUtils.node(sb);
     }
 }
--- a/src/share/classes/java/util/streams/ops/TerminalOp.java	Mon Oct 15 14:05:16 2012 -0700
+++ b/src/share/classes/java/util/streams/ops/TerminalOp.java	Mon Oct 15 15:06:03 2012 -0700
@@ -24,12 +24,6 @@
  */
 package java.util.streams.ops;
 
-
-import java.util.logging.Level;
-import java.util.logging.Logger;
-import java.util.streams.ParallelPipelineHelper;
-import java.util.streams.PipelineHelper;
-import java.util.streams.StreamAccessor;
 import java.util.streams.StreamShape;
 
 /**
@@ -39,21 +33,7 @@
  * @param <R>    The type of the result.
  * @author Brian Goetz
  */
-public interface TerminalOp<E_IN, R> extends ParallelOp<E_IN, R> {
-
-    /**
-     * Evaluate the result of the operation sequentially.
-     *
-     * @param helper
-     * @return the result of the operation.
-     */
-    <P_IN> R evaluateSequential(StreamAccessor<P_IN> source, PipelineHelper<P_IN, E_IN> helper);
-
-    @Override
-    <P_IN> R evaluateParallel(StreamAccessor<P_IN> source, ParallelPipelineHelper<P_IN, E_IN> helper) default {
-        Logger.getLogger(getClass().getName()).log(Level.WARNING, "{0} using TerminalOp.computeParallel serial default", getClass().getSimpleName());
-        return evaluateSequential(source, helper);
-    }
+public interface TerminalOp<E_IN, R> extends EvaluableOp<E_IN, R> {
 
     boolean isShortCircuit() default { return false; }
 
--- a/src/share/classes/java/util/streams/ops/ToArrayOp.java	Mon Oct 15 14:05:16 2012 -0700
+++ b/src/share/classes/java/util/streams/ops/ToArrayOp.java	Mon Oct 15 15:06:03 2012 -0700
@@ -27,7 +27,6 @@
 import java.util.Arrays;
 import java.util.streams.ParallelPipelineHelper;
 import java.util.streams.PipelineHelper;
-import java.util.streams.StreamAccessor;
 import java.util.streams.TerminalSink;
 
 /**
@@ -85,13 +84,13 @@
     }
 
     @Override
-    public <S> Object[] evaluateSequential(StreamAccessor<S> source, PipelineHelper<S, T> helper) {
-        return helper.evaluateSequential(source, sink());
+    public <S> Object[] evaluateSequential(PipelineHelper<S, T> helper) {
+        return helper.evaluateSequential(sink());
     }
 
     @Override
-    public <P_IN> Object[] evaluateParallel(StreamAccessor<P_IN> source, ParallelPipelineHelper<P_IN, T> helper) {
-        TreeUtils.Node<T> node = TreeUtils.collect(source.spliterator(), helper, false, false);
+    public <P_IN> Object[] evaluateParallel(ParallelPipelineHelper<P_IN, T> helper) {
+        TreeUtils.Node<T> node = TreeUtils.collect(helper, false);
         @SuppressWarnings("unchecked")
         T[] array = (T[]) new Object[node.size()];
 
--- a/src/share/classes/java/util/streams/ops/TreeUtils.java	Mon Oct 15 14:05:16 2012 -0700
+++ b/src/share/classes/java/util/streams/ops/TreeUtils.java	Mon Oct 15 15:06:03 2012 -0700
@@ -39,14 +39,14 @@
         throw new Error("no instances");
     }
 
-    public static <P_IN, P_OUT> Node<P_OUT> collect(Spliterator<P_IN> spliterator,
-                                                    ParallelPipelineHelper<P_IN, P_OUT> helper,
-                                                    boolean flattenLeaves,
+    public static <P_IN, P_OUT> Node<P_OUT> collect(ParallelPipelineHelper<P_IN, P_OUT> helper,
                                                     boolean flattenTree) {
-        int depth = helper.suggestDepth();
+        Spliterator<P_IN> spliterator = helper.spliterator();
         int size = spliterator.getSizeIfKnown();
+        boolean noSplit = ((size <= helper.suggestTargetSize()) && (size >= 0))
+                          || (spliterator.getNaturalSplits() == 0);
         boolean splitSizesKnown = (helper.getFlags() & Stream.FLAG_SIZED) != 0;
-        if (depth == 0) {
+        if (noSplit) {
             StreamBuilder<P_OUT> builder;
             if (size >= 0 && splitSizesKnown) {
                 builder = StreamBuilders.makeFixed(size);
@@ -63,7 +63,7 @@
                 helper.invoke(new SizedCollectorTask<>(spliterator, helper, helper.suggestTargetSize(), array));
                 return node(array);
             } else {
-                CollectorTask<P_IN, P_OUT> task = new CollectorTask<>(spliterator, helper.suggestTargetSize(), helper);
+                CollectorTask<P_IN, P_OUT> task = new CollectorTask<>(helper);
                 helper.invoke(task);
                 Node<P_OUT> node = task.getRawResult();
                 if (flattenTree) {
@@ -85,21 +85,21 @@
         }
 
         @Override
-        public <P_IN> Node<E_IN> evaluateSequential(StreamAccessor<P_IN> source, PipelineHelper<P_IN, E_IN> helper) {
+        public <P_IN> Node<E_IN> evaluateSequential(PipelineHelper<P_IN, E_IN> helper) {
             throw new UnsupportedOperationException();
         }
 
         @Override
-        public <P_IN> TreeUtils.Node<E_IN> evaluateParallel(StreamAccessor<P_IN> source, ParallelPipelineHelper<P_IN, E_IN> helper) {
-            return TreeUtils.collect(source.spliterator(), helper, false, false);
+        public <P_IN> TreeUtils.Node<E_IN> evaluateParallel(ParallelPipelineHelper<P_IN, E_IN> helper) {
+            return TreeUtils.collect(helper, false);
         }
     }
 
     private static class CollectorTask<T, U> extends AbstractTask<T, Node<U>, CollectorTask<T,U>> {
         private final ParallelPipelineHelper<T, U> helper;
 
-        private CollectorTask(Spliterator<T> spliterator, long targetSize, ParallelPipelineHelper<T, U> helper) {
-            super(spliterator, targetSize);
+        private CollectorTask(ParallelPipelineHelper<T, U> helper) {
+            super(helper);
             this.helper = helper;
         }