changeset 7488:c499c13fa66a

Javadoc for AbstractPipeline
author briangoetz
date Wed, 20 Feb 2013 17:20:17 -0500
parents 1ca7473e08aa
children a66e0dfd2973
files src/share/classes/java/util/stream/AbstractPipeline.java src/share/classes/java/util/stream/ReferencePipeline.java
diffstat 2 files changed, 104 insertions(+), 58 deletions(-) [+]
line wrap: on
line diff
--- a/src/share/classes/java/util/stream/AbstractPipeline.java	Wed Feb 20 13:49:08 2013 -0800
+++ b/src/share/classes/java/util/stream/AbstractPipeline.java	Wed Feb 20 17:20:17 2013 -0500
@@ -24,40 +24,60 @@
  */
 package java.util.stream;
 
-import java.util.Iterator;
 import java.util.Objects;
 import java.util.Spliterator;
+import java.util.function.IntFunction;
 import java.util.function.Supplier;
-import java.util.function.IntFunction;
 
 /**
- * The abstract pipeline implementation from which concrete implementations extend from.
- * <p>
- * Evaluates the pipeline to one of the following: a result for given a terminal operation; to a Node; to an Iterator
- * or; to a Spliterator.
- * </p>
- * <p>
- * An operation may be chained to the end this pipeline. The pipeline chain is a linear chain, only one operation
- * may be chained, otherwise an {@link IllegalStateException} is thrown.
- * </p>
- * <p>
- * For a parallel stream the chaining of a stateful operation results in the creation of a new pipeline chain.
- * The source to this new chain is a supplier of a spliterator that is the delayed evaluation of the stateful
- * operation and this pipeline to a Node from which the spliterator is obtained.
- * </p>
+ * Abstract base class for "pipeline" classes, which are the core implementations of the Stream interface and its
+ * primitive specializations.  Manages construction and evaluation of stream pipelines.
+ *
+ * <p>An {@code AbstractPipeline} represents an initial portion of a stream pipeline, encapsulating a stream
+ * source and zero or more intermediate operations.  Methods in this class fall into one of three categories:
+ * support for creating new streams by chaining an additional intermediate operation; support for evaluating streams
+ * by executing a terminal operation; and support for gaining additional information about the stream pipeline, such
+ * as {@link #getStreamFlags()}.  After chaining a new intermediate operation, the stream is considered to be in the
+ * @{code LINKED} state, meaning that no more intermediate or terminal operations are permitted on this stream
+ * instance.  After executing a terminal operation, the source is considered to be consumed and no more intermediate
+ * or terminal operations are permitted on this stream instance.
+ *
+ * <p>{@code AbstractPipeline} implements a number of methods that are specified in {@link BaseStream}, though it does
+ * not implement {@code BaseStream} directly.  Subclasses of {@code AbstractPipeline} will generally implement
+ * {@code BaseStream}.
+ *
+ * <p>For sequential streams, and parallel streams without <a href="package-summary.html#StreamOps">stateful
+ * intermediate operations</a>, parallel streams, pipeline evaluation is done in a single pass that "jams" all the
+ * operations together.  For parallel streams with stateful operations, execution is divided into segments,
+ * where each stateful operations marks the end of a segment, and each segment is evaluated separately and the
+ * result used as the input to the next segment.  In all cases, the source data is not consumed until a terminal
+ * operation begins.
  *
  * @param <E_IN>  Type of input elements.
  * @param <E_OUT> Type of output elements.
- *
+ * @param <S> Type of the subclass implementing {@code BaseStream}
  * @since 1.8
  */
 abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>> /* implements BaseStream */ {
+    /** The "upstream" pipeline, or null if this pipeline object represents the stream source */
     protected final AbstractPipeline upstream;
+
+    /** The number of intermediate operations between this pipeline object and the stream source */
     protected final int depth;
+
+    /** The intermediate operation represented by this pipeline object, or null if this pipeline object
+     * represents the stream source */
     protected final IntermediateOp op;
+
+    /** The combined source and operation flags for the source and all operations up to and including the
+     * operation represented by this pipeline object
+     */
     protected final int combinedSourceAndOpFlags;
 
-    // Shared between all stages of pipeline
+    /**
+     * The source spliterator for this pipeline object, which is common to all pipeline objects in the same
+     * stream pipeline.
+     */
     private final Supplier<? extends Spliterator<?>> source;
 
     private static enum PipelineState {
@@ -99,21 +119,26 @@
     }
 
     /**
-     * Constructor for the element source of a pipeline.
+     * Constructor for the head of a stream pipeline.
+     *
+     * @param source {@code Spliterator} describing the stream source
+     * @param sourceFlags The source flags for the stream source, described in {@link StreamOpFlags}
      */
-    protected AbstractPipeline(Supplier<Spliterator<?>> source, int sourceFlags) {
+    protected AbstractPipeline(Supplier<Spliterator<?>> source,
+                               int sourceFlags) {
         this(null, 0, null,
              StreamOpFlag.combineOpFlags(sourceFlags, StreamOpFlag.INITIAL_OPS_VALUE),
              Objects.requireNonNull(source));
     }
 
     /**
-     * Constructor for a pipeline operation.
+     * Constructor for appending an intermediate operation onto an existing pipeline.
      *
      * @param upstream the upstream element source.
      * @param op the operation performed upon elements.
      */
-    protected AbstractPipeline(AbstractPipeline<?, E_IN, ?> upstream, IntermediateOp<E_IN, E_OUT> op) {
+    protected AbstractPipeline(AbstractPipeline<?, E_IN, ?> upstream,
+                               IntermediateOp<E_IN, E_OUT> op) {
         this(upstream, upstream.depth + 1,
              op, StreamOpFlag.combineOpFlags(op.getOpFlags() & StreamOpFlag.OP_MASK,
                                              upstream.combinedSourceAndOpFlags),
@@ -645,15 +670,16 @@
 
             Node<E_OUT> n = getSourceNodeIfAvailable();
             if (n != null) {
-                return (flatten && isParallel()) ? AbstractPipeline.this.flatten(this, n) : n;
-            }
-            else if (isParallel()) {
-                return AbstractPipeline.this.collect(this, flatten);
-            }
-            else {
-                Node.Builder<E_OUT> nb = makeNodeBuilder(
-                        exactOutputSizeIfKnown(sourceSpliterator()));
-                return into(nb, sourceSpliterator()).build();
+                return AbstractPipeline.this.flatten(this, n);
+            } else {
+                if (isParallel()) {
+                    return AbstractPipeline.this.collect(this, flatten);
+                }
+                else {
+                    Node.Builder<E_OUT> nb = makeNodeBuilder(
+                            exactOutputSizeIfKnown(sourceSpliterator()));
+                    return into(nb, sourceSpliterator()).build();
+                }
             }
         }
 
--- a/src/share/classes/java/util/stream/ReferencePipeline.java	Wed Feb 20 13:49:08 2013 -0800
+++ b/src/share/classes/java/util/stream/ReferencePipeline.java	Wed Feb 20 17:20:17 2013 -0500
@@ -42,7 +42,7 @@
 import java.util.function.ToLongFunction;
 
 /**
- * Implementation for a {@link Stream} whose elements that are references to objects of type <code>T</code>.
+ * Implementation for a {@link Stream} whose elements objects of type {@code U}.
  *
  * @param <T> Type of elements in the upstream source.
  * @param <U> Type of elements in produced by this stage.
@@ -51,14 +51,29 @@
  */
 class ReferencePipeline<T, U> extends AbstractPipeline<T, U, Stream<U>> implements Stream<U>  {
 
-    public<S> ReferencePipeline(Supplier<? extends Spliterator<S>> supplier, int sourceFlags) {
-        super((Supplier) supplier, sourceFlags);
+    /**
+     * Constructor for the head of a stream pipeline.
+     *
+     * @param source {@code Spliterator} describing the stream source
+     * @param sourceFlags The source flags for the stream source, described in {@link StreamOpFlags}
+     */
+    public<S> ReferencePipeline(Supplier<? extends Spliterator<S>> source,
+                                int sourceFlags) {
+        super((Supplier) source, sourceFlags);
     }
 
+    /**
+     * Constructor for appending an intermediate operation onto an existing pipeline.
+     *
+     * @param upstream the upstream element source.
+     * @param op the operation performed upon elements.
+     */
     public ReferencePipeline(AbstractPipeline<?, T, ?> upstream, IntermediateOp<T, U> op) {
         super(upstream, op);
     }
 
+    // Methods from AbstractPipeline
+
     @Override
     protected StreamShape getOutputShape() {
         return StreamShape.REFERENCE;
@@ -99,7 +114,7 @@
         return Nodes.makeBuilder(exactSizeIfKnown, generator);
     }
 
-    //
+    // Stateless intermediate operations from Stream
 
     @Override
     public Stream<U> filter(Predicate<? super U> predicate) {
@@ -211,6 +226,21 @@
     }
 
     @Override
+    public Stream<U> peek(Consumer<? super U> tee) {
+        Objects.requireNonNull(tee);
+        return chainedToRef(0, StreamShape.REFERENCE,
+                            (flags, sink) -> new Sink.ChainedReference<U>(sink) {
+                                @Override
+                                public void accept(U u) {
+                                    tee.accept(u);
+                                    downstream.accept(u);
+                                }
+                            });
+    }
+
+    // Stateful intermediate operations from Stream
+
+    @Override
     public Stream<U> distinct() {
         return pipeline(new DistinctOp<U>());
     }
@@ -226,29 +256,6 @@
     }
 
     @Override
-    public void forEach(Consumer<? super U> consumer) {
-        pipeline(ForEachOp.make(consumer));
-    }
-
-    @Override
-    public void forEachUntilCancelled(Consumer<? super U> consumer, BooleanSupplier cancelledFunction) {
-        pipeline(ForEachUntilOp.make(consumer, cancelledFunction));
-    }
-
-    @Override
-    public Stream<U> peek(Consumer<? super U> tee) {
-        Objects.requireNonNull(tee);
-        return chainedToRef(0, StreamShape.REFERENCE,
-                            (flags, sink) -> new Sink.ChainedReference<U>(sink) {
-                                @Override
-                                public void accept(U u) {
-                                    tee.accept(u);
-                                    downstream.accept(u);
-                                }
-                            });
-    }
-
-    @Override
     public Stream<U> limit(long maxSize) {
         if (maxSize < 0)
             throw new IllegalArgumentException(Long.toString(maxSize));
@@ -272,6 +279,19 @@
         return super.slice(startingOffset, endingOffset - startingOffset);
     }
 
+
+    // Terminal operations from Stream
+
+    @Override
+    public void forEach(Consumer<? super U> consumer) {
+        pipeline(ForEachOp.make(consumer));
+    }
+
+    @Override
+    public void forEachUntilCancelled(Consumer<? super U> consumer, BooleanSupplier cancelledFunction) {
+        pipeline(ForEachUntilOp.make(consumer, cancelledFunction));
+    }
+
     @Override
     @SuppressWarnings("unchecked")
     public <A> A[] toArray(IntFunction<A[]> generator) {