changeset 7156:99b1d495328b

- remove PipelineHelper.wrapSequential and use in SliceOp, which now defers to re-using the sink when the leaf task is also the root task. - SliceOp only needs to visit the child nodes and discard/truncate when the absolute size information is known, which can only be the case when the root task is completed. - removed functionality in OpUtils that duplicates that in PipelineHelper.
author psandoz
date Fri, 25 Jan 2013 10:02:12 +0100
parents 703ad3145ab2
children b624065ad930
files src/share/classes/java/util/stream/AbstractPipeline.java src/share/classes/java/util/stream/NodeUtils.java src/share/classes/java/util/stream/OpUtils.java src/share/classes/java/util/stream/PipelineHelper.java src/share/classes/java/util/stream/SliceOp.java
diffstat 5 files changed, 53 insertions(+), 126 deletions(-) [+]
line wrap: on
line diff
--- a/src/share/classes/java/util/stream/AbstractPipeline.java	Thu Jan 24 16:19:28 2013 -0800
+++ b/src/share/classes/java/util/stream/AbstractPipeline.java	Fri Jan 25 10:02:12 2013 +0100
@@ -280,14 +280,6 @@
         }
 
         @Override
-        @SuppressWarnings("unchecked")
-        public Spliterator<E_OUT> wrapSequential(Spliterator<P_IN> sp) {
-            return depth == 0
-                   ? (Spliterator<E_OUT>) sp
-                   : AbstractPipeline.this.wrap(this, sp, false);
-        }
-
-        @Override
         public NodeFactory<E_OUT> getOutputNodeFactory() {
             return (NodeFactory<E_OUT>) terminalShape.nodeFactory();
         }
--- a/src/share/classes/java/util/stream/NodeUtils.java	Thu Jan 24 16:19:28 2013 -0800
+++ b/src/share/classes/java/util/stream/NodeUtils.java	Fri Jan 25 10:02:12 2013 +0100
@@ -185,8 +185,7 @@
         protected Node<U> doLeaf() {
             Node.Builder<U> builder = Nodes.makeBuilder(helper.getOutputSizeIfKnown() >= 0 ? spliterator.exactSizeIfKnown() : -1,
                                                         helper.arrayGenerator());
-            OpUtils.intoWrapped(spliterator, helper.wrapSink(builder));
-            return builder.build();
+            return helper.into(builder, spliterator).build();
         }
 
         @Override
@@ -241,7 +240,7 @@
             if (!AbstractTask.suggestSplit(helper, spliterator, targetSize) || ((split = spliterator.trySplit()) == null)) {
                 if (offset+length >= Streams.MAX_ARRAY_SIZE)
                     throw new IllegalArgumentException("Stream size exceeds max array size");
-                OpUtils.intoUnwrapped(helper, spliterator, new ArraySink<>(array, (int) offset, (int) length));
+                helper.into(new ArraySink<>(array, (int) offset, (int) length), spliterator);
                 tryComplete();
             }
             else {
@@ -348,8 +347,7 @@
         @Override
         protected Node.OfPrimitive<Integer> doLeaf() {
             Node.OfPrimitive.Builder<Integer> builder = Nodes.intMakeBuilder(helper.getOutputSizeIfKnown() >= 0 ? spliterator.exactSizeIfKnown() : -1);
-            OpUtils.intoWrapped(spliterator, helper.wrapSink(builder));
-            return builder.build();
+            return helper.into(builder, spliterator).build();
         }
 
         @Override
@@ -404,7 +402,7 @@
             if (!AbstractTask.suggestSplit(helper, spliterator, targetSize) || ((split = spliterator.trySplit()) == null)) {
                 if (offset+length >= Streams.MAX_ARRAY_SIZE)
                     throw new IllegalArgumentException("Stream size exceeds max array size");
-                OpUtils.intoUnwrapped(helper, spliterator, new IntArraySink(array, (int) offset, (int) length));
+                helper.into(new IntArraySink(array, (int) offset, (int) length), spliterator);
                 tryComplete();
             }
             else {
@@ -508,8 +506,7 @@
         @Override
         protected Node.OfPrimitive<Long> doLeaf() {
             Node.OfPrimitive.Builder<Long> builder = Nodes.longMakeBuilder(helper.getOutputSizeIfKnown() >= 0 ? spliterator.exactSizeIfKnown() : -1);
-            OpUtils.intoWrapped(spliterator, helper.wrapSink(builder));
-            return builder.build();
+            return helper.into(builder, spliterator).build();
         }
 
         @Override
@@ -564,7 +561,7 @@
             if (!AbstractTask.suggestSplit(helper, spliterator, targetSize) || ((split = spliterator.trySplit()) == null)) {
                 if (offset+length >= Streams.MAX_ARRAY_SIZE)
                     throw new IllegalArgumentException("Stream size exceeds max array size");
-                OpUtils.intoUnwrapped(helper, spliterator, new LongArraySink(array, (int) offset, (int) length));
+                helper.into(new LongArraySink(array, (int) offset, (int) length), spliterator);
                 tryComplete();
             }
             else {
@@ -668,8 +665,7 @@
         @Override
         protected Node.OfPrimitive<Double> doLeaf() {
             Node.OfPrimitive.Builder<Double> builder = Nodes.doubleMakeBuilder(helper.getOutputSizeIfKnown() >= 0 ? spliterator.exactSizeIfKnown() : -1);
-            OpUtils.intoWrapped(spliterator, helper.wrapSink(builder));
-            return builder.build();
+            return helper.into(builder, spliterator).build();
         }
 
         @Override
@@ -724,7 +720,7 @@
             if (!AbstractTask.suggestSplit(helper, spliterator, targetSize) || ((split = spliterator.trySplit()) == null)) {
                 if (offset+length >= Streams.MAX_ARRAY_SIZE)
                     throw new IllegalArgumentException("Stream size exceeds max array size");
-                OpUtils.intoUnwrapped(helper, spliterator, new DoubleArraySink(array, (int) offset, (int) length));
+                helper.into(new DoubleArraySink(array, (int) offset, (int) length), spliterator);
                 tryComplete();
             }
             else {
--- a/src/share/classes/java/util/stream/OpUtils.java	Thu Jan 24 16:19:28 2013 -0800
+++ b/src/share/classes/java/util/stream/OpUtils.java	Fri Jan 25 10:02:12 2013 +0100
@@ -38,33 +38,6 @@
         throw new IllegalStateException("no instances");
     }
 
-    /**
-     * Accepts a Sink, wraps it with upstream stages (see {@link java.util.stream.PipelineHelper#wrapSink(Sink)}, and
-     * then push all elements obtained from the Spliterator into that wrapped Sink.
-     * <p>This method performs the equivalent of <code>intoWrapped(sp, helper.wrapSink(sink))</code>.</p>
-     *
-     * @param helper the pipeline helper.
-     * @param sp   the source of elements to push into the wrapped sink.
-     * @param sink the sink in which to wrap.
-     */
-    public static<P_IN, P_OUT, S extends Sink<P_OUT>>
-    S intoUnwrapped(PipelineHelper<P_IN, P_OUT> helper, Spliterator<P_IN> sp, S sink) {
-        intoWrapped(sp, helper.wrapSink(sink));
-        return sink;
-    }
-
-    /**
-     * Push all elements obtained from a spliterator into a sink.
-     *
-     * @param sp   the source of elements to push into the sink
-     * @param sink the sink that accepts elements.
-     */
-    public static<P_IN> void intoWrapped(Spliterator<P_IN> sp, Sink<P_IN> sink) {
-        sink.begin(sp.exactSizeIfKnown());
-        sp.forEach(sink);
-        sink.end();
-    }
-
     public static<P_IN, P_OUT> void parallelForEach(PipelineHelper<P_IN, P_OUT> helper,
                                                     Sink<P_IN> sink) {
         new ForEachTask<>(helper, sink).invoke();
@@ -139,7 +112,7 @@
 
         @Override
         protected S doLeaf() {
-            return intoUnwrapped(helper, spliterator, sinkFactory.get());
+            return helper.into(sinkFactory.get(), spliterator);
         }
 
         @Override
@@ -157,56 +130,4 @@
             }
         }
     }
-
-    public static class ValueHolder<T> implements Block<T>, Supplier<T> {
-        private T value;
-
-        @Override
-        public void accept(T t) {
-            this.value = t;
-        }
-
-        @Override
-        public T get() {
-            return value;
-        }
-    }
-
-    public static class PrimitiveValueHolder {
-        public static class OfInt implements IntBlock, IntSupplier {
-            private int value;
-
-            public int getAsInt() {
-                return value;
-            }
-
-            public void accept(int value) {
-                this.value = value;
-            }
-        }
-
-        public static class OfLong implements LongBlock, LongSupplier {
-            private long value;
-
-            public long getAsLong() {
-                return value;
-            }
-
-            public void accept(long value) {
-                this.value = value;
-            }
-        }
-
-        public static class OfDouble implements DoubleBlock, DoubleSupplier {
-            private double value;
-
-            public double getAsDouble() {
-                return value;
-            }
-
-            public void accept(double value) {
-                this.value = value;
-            }
-        }
-    }
 }
--- a/src/share/classes/java/util/stream/PipelineHelper.java	Thu Jan 24 16:19:28 2013 -0800
+++ b/src/share/classes/java/util/stream/PipelineHelper.java	Fri Jan 25 10:02:12 2013 +0100
@@ -102,15 +102,6 @@
     Sink<P_IN> wrapSink(Sink<P_OUT> sink);
 
     /**
-     * Take a spliterator for a source chunk, and create a spliterator
-     * incorporating all the stages suitable for sequential traversal
-     *
-     * @param source the spliterator that is the source of input elements.
-     * @return the iterator over the sink chain.
-     */
-    Spliterator<P_OUT> wrapSequential(Spliterator<P_IN> source);
-
-    /**
      * Get the node factory corresponding to the output and shape of this pipeline.
      *
      * @return the node factory.
--- a/src/share/classes/java/util/stream/SliceOp.java	Thu Jan 24 16:19:28 2013 -0800
+++ b/src/share/classes/java/util/stream/SliceOp.java	Fri Jan 25 10:02:12 2013 +0100
@@ -25,6 +25,7 @@
 package java.util.stream;
 
 import java.util.ArrayList;
+import java.util.List;
 import java.util.Spliterator;
 import java.util.concurrent.CountedCompleter;
 
@@ -190,6 +191,23 @@
     }
 
     @Override
+    public <S> Node<T> evaluateSequential(PipelineHelper<S, T> helper) {
+        // @@@ If the input size is known then it is possible to know the output
+        //     size given skip and limit (see getFinalSize).
+        //     However it is not possible to make a builder of a fixed size since
+        //     it is assumed that fixed size is the same as the known input size.
+        Node.Builder<T> nb = outputShape().nodeFactory().makeNodeBuilder(-1, helper.arrayGenerator());
+        Sink<S> wrappedSink = helper.wrapSink(wrapSink(helper.getStreamAndOpFlags(), nb));
+        Spliterator<S> spliterator = helper.sourceSpliterator();
+
+        wrappedSink.begin(spliterator.exactSizeIfKnown());
+        helper.getInputShape().forEachWithCancel(spliterator, wrappedSink);
+        wrappedSink.end();
+
+        return nb.build();
+    }
+
+    @Override
     public <S> Node<T> evaluateParallel(PipelineHelper<S, T> helper) {
         // Parallel strategy -- two cases
         // IF we have full size information
@@ -210,7 +228,7 @@
 //        if (size >= 0 && helper.getOutputSizeIfKnown() == size && spliterator.isPredictableSplits())
 //            return helper.invoke(new SizedSliceTask<>(helper, skip, getFinalSize(helper)));
 //        else
-            return new SliceTask<>(helper, skip, limit).invoke();
+              return new SliceTask<>(this, helper, skip, limit).invoke();
     }
 
     @Override
@@ -219,19 +237,22 @@
     }
 
     private static class SliceTask<S, T> extends AbstractShortCircuitTask<S, T, Node<T>, SliceTask<S, T>> {
+        private final SliceOp<T> op;
         private final long targetOffset, targetSize;
         private long thisNodeSize;
 
         private volatile boolean completed;
 
-        private SliceTask(PipelineHelper<S, T> helper, long offset, long size) {
+        private SliceTask(SliceOp<T> op, PipelineHelper<S, T> helper, long offset, long size) {
             super(helper);
+            this.op = op;
             this.targetOffset = offset;
             this.targetSize = size;
         }
 
         private SliceTask(SliceTask<S, T> parent, Spliterator<S> spliterator) {
             super(parent, spliterator);
+            this.op = parent.op;
             this.targetOffset = parent.targetOffset;
             this.targetSize = parent.targetSize;
         }
@@ -249,12 +270,11 @@
         @Override
         protected final Node<T> doLeaf() {
             if (isRoot()) {
-                Spliterator<T> wrapped = helper.wrapSequential(spliterator);
-                return helper.getOutputNodeFactory().truncateToNode(-1, wrapped, targetOffset, targetSize, helper.arrayGenerator());
+                return op.evaluateSequential(helper);
             }
             else {
-                Node<T> node = OpUtils.intoUnwrapped(helper, spliterator,
-                                                     helper.getOutputNodeFactory().makeNodeBuilder(-1, helper.arrayGenerator())).build();
+                Node<T> node = helper.into(helper.getOutputNodeFactory().makeNodeBuilder(-1, helper.arrayGenerator()),
+                                           spliterator).build();
                 thisNodeSize = node.count();
                 completed = true;
                 return node;
@@ -268,16 +288,23 @@
                 for (SliceTask<S, T> child = children; child != null; child = child.nextSibling)
                     thisNodeSize += child.thisNodeSize;
                 completed = true;
-                ArrayList<Node<T>> nodes = new ArrayList<>();
-                visit(nodes, 0);
-                Node<T> result;
-                if (nodes.size() == 0)
-                    result = helper.getOutputNodeFactory().emptyNode();
-                else if (nodes.size() == 1)
-                    result = nodes.get(0);
-                else
-                    result = helper.getOutputNodeFactory().concNodes(nodes);
-                setLocalResult(result);
+
+                if (isRoot()) {
+                    // Only collect nodes once absolute size information is known
+
+                    ArrayList<Node<T>> nodes = new ArrayList<>();
+                    visit(nodes, 0);
+                    Node<T> result;
+                    if (nodes.size() == 0)
+                        result = helper.getOutputNodeFactory().emptyNode();
+                    else if (nodes.size() == 1)
+                        result = nodes.get(0);
+                    else
+                        // This will create a tree of depth 1 and will not be a sub-tree
+                        // for leaf nodes within the require range
+                        result = helper.getOutputNodeFactory().concNodes(nodes);
+                    setLocalResult(result);
+                }
             }
             if (targetSize >= 0) {
                 if (((SliceTask<S,T>) getRoot()).leftSize() >= targetOffset + targetSize)
@@ -304,7 +331,7 @@
             }
         }
 
-        private void visit(ArrayList<Node<T>> results, int offset) {
+        private void visit(List<Node<T>> results, int offset) {
             if (!isLeaf()) {
                 for (SliceTask<S, T> child = children; child != null; child = child.nextSibling) {
                     child.visit(results, offset);