changeset 6394:ba5cff68c181

Move BaseStream.getShape to AbstractPipeline.getOutputShape. This ensures BaseStream does not expose any SPI-related classes in the public API.
author psandoz
date Tue, 13 Nov 2012 10:10:14 +0100
parents 19cbd202a30a
children bac0249a3f18
files src/share/classes/java/util/streams/AbstractPipeline.java src/share/classes/java/util/streams/BaseStream.java
diffstat 2 files changed, 78 insertions(+), 71 deletions(-) [+]
line wrap: on
line diff
--- a/src/share/classes/java/util/streams/AbstractPipeline.java	Tue Nov 13 09:52:30 2012 +0100
+++ b/src/share/classes/java/util/streams/AbstractPipeline.java	Tue Nov 13 10:10:14 2012 +0100
@@ -122,7 +122,7 @@
         this.sourceFlags = upstream.sourceFlags;
         this.shape = upstream.shape;
 
-        assert upstream.getShape().getStreamType() == op.inputShape().getStreamType();
+        assert upstream.getOutputShape().getStreamType() == op.inputShape().getStreamType();
         assert (upstream.depth == 0) ^ (upstream.op != null);
     }
 
@@ -444,6 +444,82 @@
         return false;
     }
 
+    // Chaining and result methods
+
+    /**
+     * Chain an operation to the tail of this pipeline to create a new stream.
+     *
+     * @param op the operation to chain.
+     * @param <E_NEXT> the type of elements output from the new stream.
+     * @param <S> the type of stream.
+     * @return the new stream.
+     */
+    @SuppressWarnings("unchecked")
+    public <E_NEXT, S extends BaseStream<E_NEXT>> S pipeline(IntermediateOp<E_OUT, E_NEXT> op) {
+        return (S) op.outputShape().chain(this, op);
+    }
+
+    /**
+     * Evaluate the pipeline with a terminal operation to produce a result.
+     *
+     * @param terminal the terminal operation to be applied to the pipeline.
+     * @param <R> the type of result.
+     * @return the result.
+     */
+    public <R> R pipeline(TerminalOp<E_OUT, R> terminal) {
+        assert getOutputShape().getStreamType() == terminal.inputShape().getStreamType();
+        return evaluate(terminal);
+    }
+
+    /**
+     * Collect the elements output from the pipeline.
+     *
+     * @return a node that holds the collected output elements.
+     */
+    public Node<E_OUT> collectOutput() {
+        // @@@ Generalize CollectorOps for shape when primitive support is in place
+        TerminalOp<E_OUT, Node<E_OUT>> collect = new TerminalOp<E_OUT, Node<E_OUT>>() {
+            @Override
+            public StreamShape inputShape() {
+                return getOutputShape();
+            }
+
+            @Override
+            public <P_IN> Node<E_OUT> evaluateParallel(ParallelPipelineHelper<P_IN, E_OUT> helper) {
+                return helper.collectOutput();
+            }
+
+            @Override
+            public <P_IN> Node<E_OUT> evaluateSequential(PipelineHelper<P_IN, E_OUT> helper) {
+                return helper.collectOutput();
+            }
+        };
+        return evaluate(collect);
+    }
+
+    /**
+     * Get the output shape of the pipeline.
+     *
+     * @return the output shape. If the pipeline is the head then it's output shape corresponds to the shape of the
+     * source. Otherwise, it's output shape corresponds to the output shape of the associated operation.
+     */
+    public StreamShape getOutputShape() {
+        return op == null ? shape : op.outputShape();
+    }
+
+    /**
+     * Get the input shape of the pipeline.
+     *
+     * @return the input shape. If the pipeline is the head then it's input shape corresponds to the shape of the
+     * source. Otherwise, it's input shape corresponds to the input shape of the associated operation.
+     */
+    public StreamShape getInputShape() {
+        return op == null ? shape : op.inputShape();
+    }
+
+    // BaseStream
+
+    @Override
     @SuppressWarnings("unchecked")
     public Iterator<E_OUT> iterator() {
         if (iterator == null) {
@@ -474,70 +550,8 @@
         return iterator;
     }
 
-    public StreamShape getInputShape() {
-        return op == null ? shape : op.inputShape();
-    }
-
-    // Chaining and result methods
-
-    /**
-     * Chain an operation to the tail of this pipeline to create a new stream.
-     *
-     * @param op the operation to chain.
-     * @param <E_NEXT> the type of elements output from the new stream.
-     * @param <S> the type of stream.
-     * @return the new stream.
-     */
-    @SuppressWarnings("unchecked")
-    public <E_NEXT, S extends BaseStream<E_NEXT>> S pipeline(IntermediateOp<E_OUT, E_NEXT> op) {
-        return (S) op.outputShape().chain(this, op);
-    }
-
-    /**
-     * Evaluate the pipeline with a terminal operation to produce a result.
-     *
-     * @param terminal the terminal operation to be applied to the pipeline.
-     * @param <R> the type of result.
-     * @return the result.
-     */
-    public <R> R pipeline(TerminalOp<E_OUT, R> terminal) {
-        assert getShape().getStreamType() == terminal.inputShape().getStreamType();
-        return evaluate(terminal);
-    }
-
-    /**
-     * Collection the elements output from the pipeline.
-     *
-     * @return a node that holds the collected output elements.
-     */
-    public Node<E_OUT> collectOutput() {
-        // @@@ Generalize CollectorOps for shape when primitive support is in place
-        TerminalOp<E_OUT, Node<E_OUT>> collect = new TerminalOp<E_OUT, Node<E_OUT>>() {
-            @Override
-            public StreamShape inputShape() {
-                return getShape();
-            }
-
-            @Override
-            public <P_IN> Node<E_OUT> evaluateParallel(ParallelPipelineHelper<P_IN, E_OUT> helper) {
-                return helper.collectOutput();
-            }
-
-            @Override
-            public <P_IN> Node<E_OUT> evaluateSequential(PipelineHelper<P_IN, E_OUT> helper) {
-                return helper.collectOutput();
-            }
-        };
-        return evaluate(collect);
-    }
-
-    // BaseStream
-
+    @Override
     public boolean isParallel() {
         return StreamOpFlags.PARALLEL.isKnown(sourceFlags);
     }
-
-    public StreamShape getShape() {
-        return op == null ? shape : op.outputShape();
-    }
 }
--- a/src/share/classes/java/util/streams/BaseStream.java	Tue Nov 13 09:52:30 2012 +0100
+++ b/src/share/classes/java/util/streams/BaseStream.java	Tue Nov 13 10:10:14 2012 +0100
@@ -50,11 +50,4 @@
      * @return {@code true} if this stream may be split for parallel processing.
      */
     boolean isParallel();
-
-    /**
-     * Returns the shape of elements of this stream.
-     *
-     * @return the shape of elements of this stream.
-     */
-    StreamShape getShape();
 }