changeset 6397:55b0174eac03

Defer to the stream shape for the creation of a node builder holding elements of the required shape and to collect elements output from a pipeline into a node holding elements of the required shape.
author psandoz
date Wed, 14 Nov 2012 10:52:44 +0100
parents 38dc6745cd9b
children abcf56edb55c
files src/share/classes/java/util/streams/AbstractPipeline.java src/share/classes/java/util/streams/PipelineHelper.java src/share/classes/java/util/streams/StreamShape.java src/share/classes/java/util/streams/StreamShapeFactory.java src/share/classes/java/util/streams/ops/AbstractTask.java src/share/classes/java/util/streams/ops/IntermediateOp.java src/share/classes/java/util/streams/ops/Node.java src/share/classes/java/util/streams/ops/Nodes.java src/share/classes/java/util/streams/ops/OpUtils.java src/share/classes/java/util/streams/ops/TreeUtils.java
diffstat 10 files changed, 94 insertions(+), 43 deletions(-) [+]
line wrap: on
line diff
--- a/src/share/classes/java/util/streams/AbstractPipeline.java	Tue Nov 13 13:31:22 2012 +0100
+++ b/src/share/classes/java/util/streams/AbstractPipeline.java	Wed Nov 14 10:52:44 2012 +0100
@@ -88,7 +88,7 @@
     /**
      * If non-{@code null} then we are in sequential iteration mode.
      */
-    private Iterator<E_OUT> iterator;
+    protected Iterator<E_OUT> iterator;
 
     /**
      * Constructor for the element source of a pipeline.
@@ -189,7 +189,8 @@
                                    IntermediateOp[] ops, int from, int upTo,
                                    StreamOp<?, R> terminal) {
         return (R) terminal.evaluateParallel(new ParallelImplPipelineHelper(node, spliterator, sourceFlags,
-                                                                            opsFlags, ops, from, upTo));
+                                                                            opsFlags, ops, from, upTo,
+                                                                            terminal.inputShape()));
     }
 
     @SuppressWarnings("unchecked")
@@ -200,12 +201,13 @@
                                      IntermediateOp[] ops, int from, int upTo,
                                      TerminalOp<E_OUT, R> terminal) {
         return (R) terminal.evaluateSequential(new SequentialImplPipelineHelper(node, spliterator, sourceFlags,
-                                                                                opsFlags, ops, from, upTo));
+                                                                                opsFlags, ops, from, upTo,
+                                                                                terminal.inputShape()));
     }
 
     @SuppressWarnings("unchecked")
     protected <R> R evaluateSequential(TerminalOp<E_OUT, R> terminal, int sourceFlags) {
-        return (R) terminal.evaluateSequential(new SequentialImplPipelineHelperSource(sourceFlags));
+        return (R) terminal.evaluateSequential(new SequentialImplPipelineHelperSource(sourceFlags, terminal.inputShape()));
     }
 
     static abstract class AbstractPipelineHelper<P_IN, P_OUT> implements PipelineHelper<P_IN, P_OUT> {
@@ -217,12 +219,17 @@
         final int upTo;
         final int sourceFlags;
         final int sourceAndOpsFlags;
+        final StreamShape terminalShape;
 
-        AbstractPipelineHelper(Spliterator<P_IN> spliterator, int sourceFlags, int[] opsFlags, IntermediateOp[] ops, int from, int upTo) {
-            this(null, spliterator, sourceFlags, opsFlags, ops, from, upTo);
+        AbstractPipelineHelper(Spliterator<P_IN> spliterator, int sourceFlags,
+                               int[] opsFlags, IntermediateOp[] ops, int from, int upTo,
+                               StreamShape terminalShape) {
+            this(null, spliterator, sourceFlags, opsFlags, ops, from, upTo, terminalShape);
         }
 
-        AbstractPipelineHelper(Node<P_IN> node, Spliterator<P_IN> spliterator, int sourceFlags, int[] opsFlags, IntermediateOp[] ops, int from, int upTo) {
+        AbstractPipelineHelper(Node<P_IN> node, Spliterator<P_IN> spliterator, int sourceFlags,
+                               int[] opsFlags, IntermediateOp[] ops, int from, int upTo,
+                               StreamShape terminalShape) {
             this.node = node;
             this.spliterator = spliterator;
             this.opsFlags = opsFlags;
@@ -230,6 +237,7 @@
             this.from = from;
             this.upTo = upTo;
             this.sourceFlags = sourceFlags;
+            this.terminalShape = terminalShape;
 
             int flags = opsFlags[from] = StreamOpFlags.INITIAL_OPS_VALUE;
             for (int i = from; i < upTo; i++) {
@@ -251,6 +259,11 @@
             return upTo == from;
         }
 
+        protected NodeBuilder<P_OUT> makeNodeBuilder() {
+            // @@@ Unchecked
+            return terminalShape.makeNodeBuilder(getOutputSizeIfKnown());
+        }
+
         @Override
         public <S extends Sink<P_OUT>> S into(S sink) {
             Sink<P_IN> wrappedSink = wrapSink(sink);
@@ -297,14 +310,17 @@
 
     class SequentialImplPipelineHelper<P_IN> extends AbstractPipelineHelper<P_IN, E_OUT> {
 
-        SequentialImplPipelineHelper(Spliterator<P_IN> spliterator, int sourceFlags, IntermediateOp[] ops) {
+        SequentialImplPipelineHelper(Spliterator<P_IN> spliterator, int sourceFlags,
+                                     IntermediateOp[] ops,
+                                     StreamShape terminalShape) {
             // Start from 2nd element since 1st is the head containing the source
-            super(spliterator, sourceFlags, new int[ops.length + 1], ops, 0, ops.length);
+            super(spliterator, sourceFlags, new int[ops.length + 1], ops, 0, ops.length, terminalShape);
         }
 
         SequentialImplPipelineHelper(Node<P_IN> node, Spliterator<P_IN> spliterator, int sourceFlags,
-                                     int[] opsFlags, IntermediateOp[] ops, int from, int to) {
-            super(node, spliterator, sourceFlags, opsFlags, ops, from, to);
+                                     int[] opsFlags, IntermediateOp[] ops, int from, int to,
+                                     StreamShape terminalShape) {
+            super(node, spliterator, sourceFlags, opsFlags, ops, from, to, terminalShape);
         }
 
         boolean requirePull() {
@@ -329,10 +345,12 @@
         @Override
         public Node<E_OUT> collectOutput() {
             if (hasZeroDepth() && node != null) {
+                // @@@ Unchecked
                 return (Node<E_OUT>) node;
             }
-            else
-                return into(Nodes.<E_OUT>makeBuilder(getOutputSizeIfKnown())).build();
+            else {
+                return into(makeNodeBuilder()).build();
+            }
         }
 
         @Override
@@ -343,8 +361,8 @@
 
     class SequentialImplPipelineHelperSource extends SequentialImplPipelineHelper {
 
-        <R> SequentialImplPipelineHelperSource(int sourceFlags) {
-            super(AbstractPipeline.this.spliterator, sourceFlags, ops());
+        <R> SequentialImplPipelineHelperSource(int sourceFlags, StreamShape terminalShape) {
+            super(AbstractPipeline.this.spliterator, sourceFlags, ops(), terminalShape);
         }
 
         @Override
@@ -368,8 +386,9 @@
         final long targetSize;
 
         ParallelImplPipelineHelper(Node<P_IN> node, Spliterator<P_IN> spliterator, int sourceFlags,
-                                   int[] opsFlags, IntermediateOp[] ops, int from, int to) {
-            super(node, spliterator, sourceFlags, opsFlags, ops, from, to);
+                                   int[] opsFlags, IntermediateOp[] ops, int from, int to,
+                                   StreamShape terminalShape) {
+            super(node, spliterator, sourceFlags, opsFlags, ops, from, to, terminalShape);
             this.targetSize = calculateTargetSize();
         }
 
@@ -411,10 +430,13 @@
         @Override
         public Node<E_OUT> collectOutput() {
             if (hasZeroDepth() && node != null) {
+                // @@@ Unchecked
                 return (Node<E_OUT>) node;
             }
-            else
-                return TreeUtils.collect(this, false);
+            else {
+                // @@@ Unchecked
+                return terminalShape.collect(this, false);
+            }
         }
     }
 
--- a/src/share/classes/java/util/streams/PipelineHelper.java	Tue Nov 13 13:31:22 2012 +0100
+++ b/src/share/classes/java/util/streams/PipelineHelper.java	Wed Nov 14 10:52:44 2012 +0100
@@ -28,15 +28,13 @@
 import java.util.streams.ops.Node;
 
 /**
- *
- * @param <P_IN> Type of elements input to the pipeline.
+ * @param <P_IN>  Type of elements input to the pipeline.
  * @param <P_OUT> Type of elements output from the pipeline.
  */
 public interface PipelineHelper<P_IN, P_OUT> {
 
     /**
      * @return the combined stream and operation flags.
-     *
      * @see {@link StreamOpFlags}
      */
     int getStreamFlags();
@@ -86,4 +84,4 @@
      * @return the node containing all output elements.
      */
     Node<P_OUT> collectOutput();
-}
+}
\ No newline at end of file
--- a/src/share/classes/java/util/streams/StreamShape.java	Tue Nov 13 13:31:22 2012 +0100
+++ b/src/share/classes/java/util/streams/StreamShape.java	Wed Nov 14 10:52:44 2012 +0100
@@ -26,16 +26,16 @@
 
 import java.util.streams.ops.IntermediateOp;
 import java.util.streams.ops.Node;
+import java.util.streams.ops.NodeBuilder;
 
 /**
-* StreamShape
-*
-* @author Brian Goetz
-*/
+ * StreamShape
+ *
+ * @author Brian Goetz
+ */
 public interface StreamShape<S extends BaseStream> {
 
     /**
-     *
      * @return the stream type of this shape.
      */
     Class<S> getStreamType();
@@ -54,7 +54,6 @@
      */
     <U, V> AbstractPipeline<U, V> chain(AbstractPipeline<?, U> upstream, IntermediateOp<U, V> op);
 
-
     // @@@ Should this be the place where the base mechanisms to create a stream reside?
     //     i.e. spliterator plus flags
 
@@ -63,6 +62,7 @@
     /**
      * Create a sequential pipeline from a compatible node.
      * <p>A compatible node is a node whose contents is compatible with the stream shape.</p>
+     *
      * @param n the compatible node
      * @return the pipeline, whose content is sourced from the node.
      * @throws IllegalArgumentException if the node is not compatible with this stream shape.
@@ -72,9 +72,35 @@
     /**
      * Create a parallel pipeline from a compatible node.
      * <p>A compatible node is a node whose contents is compatible with the stream shape.</p>
+     *
      * @param n the compatible node
      * @return the pipeline, whose content is sourced from the node.
      * @throws IllegalArgumentException if the node is not compatible with this stream shape.
      */
     <T> AbstractPipeline<?, T> parallel(Node<T> n) throws IllegalArgumentException;
+
+    /**
+     * Make a node builder that can hold elements of this shape.
+     *
+     * @param sizeIfKnown if >=0 then a node builder will be created that has a fixed capacity of at most
+     *                    sizeIfKnown elements.
+     *                    If < 0 then the node builder has an unfixed capacity.
+     *                    A fixed capacity node builder will throw exceptions if an element is added and
+     *                    the builder has reached capacity.
+     * @param <T> The type of elements.
+     * @return the node builder.
+     */
+    <T> NodeBuilder<T> makeNodeBuilder(int sizeIfKnown);
+
+    /**
+     * Collect elements output from a pipeline into Node that holds elements of this shape.
+     *
+     * @param helper the parallel pipeline helper from which elements are obtained.
+     * @param flattenTree true of the returned node should be flattened to one node holding an
+     *                    array of elements.
+     * @param <P_IN> the type of elements input into the pipeline.
+     * @param <T> the type of elements output from the pipeline.
+     * @return the node holding elements output from the pipeline.
+     */
+    <P_IN, T> Node<T> collect(ParallelPipelineHelper<P_IN, T> helper, boolean flattenTree);
 }
--- a/src/share/classes/java/util/streams/StreamShapeFactory.java	Tue Nov 13 13:31:22 2012 +0100
+++ b/src/share/classes/java/util/streams/StreamShapeFactory.java	Wed Nov 14 10:52:44 2012 +0100
@@ -25,8 +25,7 @@
 package java.util.streams;
 
 import java.util.WeakHashMap;
-import java.util.streams.ops.IntermediateOp;
-import java.util.streams.ops.Node;
+import java.util.streams.ops.*;
 
 public final class StreamShapeFactory {
 
@@ -60,6 +59,16 @@
                 return new ReferencePipeline<>(n.spliterator(),
                                              StreamOpFlags.IS_ORDERED | StreamOpFlags.IS_SIZED | StreamOpFlags.IS_PARALLEL);
             }
+
+            @Override
+            public <T> NodeBuilder<T> makeNodeBuilder(int sizeIfKnown) {
+                return Nodes.makeBuilder(sizeIfKnown);
+            }
+
+            @Override
+            public <P_IN, T> Node<T> collect(ParallelPipelineHelper<P_IN, T> helper, boolean flattenTree) {
+                return TreeUtils.collect(helper, flattenTree);
+            }
         };
 
         set(referenceStreamShape);
--- a/src/share/classes/java/util/streams/ops/AbstractTask.java	Tue Nov 13 13:31:22 2012 +0100
+++ b/src/share/classes/java/util/streams/ops/AbstractTask.java	Wed Nov 14 10:52:44 2012 +0100
@@ -38,7 +38,7 @@
  *
  * @author Brian Goetz
  */
-abstract class AbstractTask<P_IN, P_OUT, R, T extends AbstractTask<P_IN, P_OUT, R, T>>
+public abstract class AbstractTask<P_IN, P_OUT, R, T extends AbstractTask<P_IN, P_OUT, R, T>>
         extends CountedCompleter<R> {
     protected final ParallelPipelineHelper<P_IN, P_OUT> helper;
     protected final Spliterator<P_IN> spliterator;
--- a/src/share/classes/java/util/streams/ops/IntermediateOp.java	Tue Nov 13 13:31:22 2012 +0100
+++ b/src/share/classes/java/util/streams/ops/IntermediateOp.java	Wed Nov 14 10:52:44 2012 +0100
@@ -88,7 +88,7 @@
     @Override
     default <P_IN> Node<E_OUT> evaluateSequential(PipelineHelper<P_IN, E_IN> helper) {
         // @@@ Can we determine the size from the pipeline and this operation?
-        final NodeBuilder<E_OUT> nb = Nodes.makeVariableSizeBuilder();
+        final NodeBuilder<E_OUT> nb = outputShape().makeNodeBuilder(-1);
         helper.into(wrapSink(helper.getStreamFlags(), nb));
         return nb.build();
     }
--- a/src/share/classes/java/util/streams/ops/Node.java	Tue Nov 13 13:31:22 2012 +0100
+++ b/src/share/classes/java/util/streams/ops/Node.java	Wed Nov 14 10:52:44 2012 +0100
@@ -47,7 +47,7 @@
      *
      * @return the child nodes.
      */
-    default Iterator<Node<T>> children() {
+    default Iterator<? extends Node<T>> children() {
         return Collections.emptyIterator();
     }
 
@@ -57,7 +57,7 @@
      *
      * @return a flattened node which has no children but holds the equivalent content as this node.
      */
-    Node flatten();
+    Node<T> flatten();
 
     /**
      * View this node as an array.
@@ -73,7 +73,6 @@
     /**
      * Copy the content of this node into an array at an offset.
      *
-     *
      * @param array the array to copy the data into.
      * @param offset the offset into the array.
      * @throws IndexOutOfBoundsException if copying would cause access of data outside array bounds.
--- a/src/share/classes/java/util/streams/ops/Nodes.java	Tue Nov 13 13:31:22 2012 +0100
+++ b/src/share/classes/java/util/streams/ops/Nodes.java	Wed Nov 14 10:52:44 2012 +0100
@@ -316,7 +316,7 @@
 
         private static class ConcNodeSpliterator<T> implements Spliterator<T> {
             private Node<T> cur;
-            private Iterator<Node<T>> children;
+            private Iterator<? extends Node<T>> children;
             private int splitsLeft;
             private Iterator<T> iterator;
 
@@ -754,10 +754,7 @@
                 traversing = true;
 
                 if (isSpinedSpliterator) {
-                    Iterator<T> remaining = iterator();
-                    while (remaining.hasNext()) {
-                        block.apply(remaining.next());
-                    }
+                    iterator().forEach(block);
                 } else {
                     for (int i = offset; i < endOffset; i++) {
                         block.apply(elements[i]);
--- a/src/share/classes/java/util/streams/ops/OpUtils.java	Tue Nov 13 13:31:22 2012 +0100
+++ b/src/share/classes/java/util/streams/ops/OpUtils.java	Wed Nov 14 10:52:44 2012 +0100
@@ -110,7 +110,7 @@
     }
 
 
-    interface AccumulatingSink<T, R, K extends AccumulatingSink<T, R, K>> extends TerminalSink<T, R> {
+    public interface AccumulatingSink<T, R, K extends AccumulatingSink<T, R, K>> extends TerminalSink<T, R> {
         public void combine(K other);
         public void clearState();
     }
--- a/src/share/classes/java/util/streams/ops/TreeUtils.java	Tue Nov 13 13:31:22 2012 +0100
+++ b/src/share/classes/java/util/streams/ops/TreeUtils.java	Wed Nov 14 10:52:44 2012 +0100
@@ -205,7 +205,7 @@
             if (node.getChildCount() > 0) {
                 setPendingCount(node.getChildCount() - 1);
 
-                final Iterator<Node<T>> itNodes = node.children();
+                final Iterator<? extends Node<T>> itNodes = node.children();
 
                 final ToArrayTask<T> firstTask = new ToArrayTask<>(this, itNodes.next(), offset);
                 int size = firstTask.node.size();