changeset 7633:b957521c820d

Updates from review of 8008670
author briangoetz
date Wed, 13 Mar 2013 18:47:28 -0400
parents a657ad5273ba
children b55d7d5f66fa
files src/share/classes/java/util/stream/AbstractTask.java src/share/classes/java/util/stream/ForEachOps.java src/share/classes/java/util/stream/IntermediateOp.java src/share/classes/java/util/stream/Node.java src/share/classes/java/util/stream/Sink.java src/share/classes/java/util/stream/StatefulOp.java src/share/classes/java/util/stream/Tripwire.java
diffstat 7 files changed, 70 insertions(+), 36 deletions(-) [+]
line wrap: on
line diff
--- a/src/share/classes/java/util/stream/AbstractTask.java	Wed Mar 13 18:45:25 2013 -0400
+++ b/src/share/classes/java/util/stream/AbstractTask.java	Wed Mar 13 18:47:28 2013 -0400
@@ -31,8 +31,9 @@
 /**
  * Abstract base class for most fork-join tasks used to implement stream ops.
  * Manages splitting logic, tracking of child tasks, and intermediate results.
- * Each task is associated with a {@link Spliterator} that describes a portion
- * of the input.  Tasks may be leaf nodes (which will traverse the elements of
+ * Each task is associated with a {@link Spliterator} that describes the portion
+ * of the input associated with the subtree rooted at this task.
+ * Tasks may be leaf nodes (which will traverse the elements of
  * the {@code Spliterator}) or internal nodes (which split the
  * {@code Spliterator} into multiple child tasks).
  *
@@ -75,7 +76,7 @@
  * @param <P_OUT> Type of elements output from the pipeline
  * @param <R> Type of intermediate result, which may be different from operation
  *        result type
- * @param <T> Type of child and sibling tasks
+ * @param <T> Type of parent, child and sibling tasks
  * @since 1.8
  */
 abstract class AbstractTask<P_IN, P_OUT, R, T extends AbstractTask<P_IN, P_OUT, R, T>>
@@ -171,13 +172,24 @@
         return suggestSplit(helper, spliterator, targetSize);
     }
 
-    /** Returns the local result, if any */
+    /**
+     * Returns the local result, if any.  Subclasses should use {@link #setLocalResult(Object)} and
+     * {@link #getLocalResult()} to manage results.  This returns the local result so that calls from
+     * within the fork-join framework will return the correct result.
+     *
+     * @return the local result.
+     */
     @Override
     public R getRawResult() {
         return localResult;
     }
 
-    /** Does nothing; argument must be null, or an exception is thrown */
+    /**
+     * Does nothing; instead, subclasses should use {@link #setLocalResult(Object)}} to manage results.
+     *
+     * @param result must be null, or an exception is thrown (this is a safety tripwire to detect when
+     *               {@code setRawResult()} is being used instead of {@code setLocalResult()}
+     */
     @Override
     protected void setRawResult(R result) {
         if (result != null)
@@ -292,6 +304,7 @@
     public void onCompletion(CountedCompleter<?> caller) {
         spliterator = null;
         children = null;
+        numChildren = 0;
     }
 
     /**
--- a/src/share/classes/java/util/stream/ForEachOps.java	Wed Mar 13 18:45:25 2013 -0400
+++ b/src/share/classes/java/util/stream/ForEachOps.java	Wed Mar 13 18:47:28 2013 -0400
@@ -41,18 +41,25 @@
  * elements to a {@code Consumer}.
  *
  * <p>{@code forEachUntil} traverses elements of a stream and sends those
- * elements to to a {@code Consumer} until a {@code BooleanProvider} indicates
+ * elements to to a {@code Consumer} until a {@code BooleanSupplier} indicates
  * that a termination criteria has occurred and no more elements should be
  * traversed and sent.
  *
  * <p>For either type of traversal elements will be sent to the {@code Consumer}
- * in whatever thread and whatever order they become available, independent of
+ * on whatever thread and whatever order they become available, independent of
  * the stream's encounter order.
  *
  * <p>Exceptions occurring as a result of sending an element to the
  * {@code Consumer} will be relayed to the caller and traversal will be
  * prematurely terminated.
  *
+ * <p>There is no guarantee that additional elements will not be traversed and
+ * sent after the termination criteria has transpired.  For example, a
+ * termination criteria of {@code resultSet.size() > TARGET} does not guarantee
+ * that the result set will receive no more than {@code TARGET} elements, but
+ * instead that {@code forEachUntil} traversal will attempt to stop after
+ * {@code TARGET} elements have been placed in the {@code resultSet}.
+ *
  * @apiNote
  * The termination condition is an externally-imposed criteria, and is useful
  * for problems like "find the best answer that can be found in ten seconds",
@@ -60,13 +67,6 @@
  * designed to provide content-based cancellation, such as "process elements
  * until you find one which matches a given criteria."
  *
- * <p>There is no guarantee that additional elements will not be traversed and
- * sent after the termination criteria has transpired.  For example, a
- * termination criteria of {@code resultSet.size() > TARGET} does not guarantee
- * that the result set will receive no more than {@code TARGET} elements, but
- * instead that {@code forEachUntil} traversal will attempt to stop after
- * {@code TARGET} elements have been placed in the {@code resultSet}.
- *
  * @since 1.8
  */
 final class ForEachOps {
--- a/src/share/classes/java/util/stream/IntermediateOp.java	Wed Mar 13 18:45:25 2013 -0400
+++ b/src/share/classes/java/util/stream/IntermediateOp.java	Wed Mar 13 18:47:28 2013 -0400
@@ -27,8 +27,9 @@
 /**
  * An operation in a stream pipeline that takes a stream as input and produces
  * a stream, possibly of a different type, as output.  An intermediate operation
- * has an input type and an output type (and, an associated input shape and
- * output shape).  An intermediate operation also has a set of <em>operation
+ * has an input type and an output type, reflected in its type parameters
+ * {@code E_IN} and {@code E_OUT}, and, an associated input shape and
+ * output shape.  An intermediate operation also has a set of <em>operation
  * flags</em> that describes how it transforms characteristics of the stream
  * (such as sortedness or size; see {@link StreamOpFlag}).
  *
--- a/src/share/classes/java/util/stream/Node.java	Wed Mar 13 18:45:25 2013 -0400
+++ b/src/share/classes/java/util/stream/Node.java	Wed Mar 13 18:47:28 2013 -0400
@@ -71,7 +71,8 @@
 
     /**
      * Traverses the elements of this node, and invoke the provided
-     * {@code Consumer} with each element.
+     * {@code Consumer} with each element.  Elements are provided in in encounter order
+     * if the source for the {@code Node} has a defined encounter order.
      *
      * @param consumer A {@code Consumer} that is to be invoked with each
      *        element in this {@code Node}
@@ -104,13 +105,15 @@
     }
 
     /**
-     * Views this node as an array.
+     * Provide an array view of the contents of this node.
      *
-     * <p>Depending on the underlying implementation this may return a reference
-     * to an internal array rather than a copy.  It is the callers
-     * responsibility to decide if either this node or the array is utilized as
-     * the primary reference for the data.</p>
+     * <p>Depending on the underlying implementation, this may return a reference
+     * to an internal array rather than a copy.  Since the returned array may be shared,
+     * the resulting array should not be modified.  The {@code generator}
+     * function may be consulted to create the array if a new array needs to be created.
      *
+     * @param generator A factory function which takes an integer parameter and returns
+     *                  a new, empty array of that size and of the appropriate array type
      * @return an array containing the contents of this {@code Node}
      */
     T[] asArray(IntFunction<T[]> generator);
--- a/src/share/classes/java/util/stream/Sink.java	Wed Mar 13 18:45:25 2013 -0400
+++ b/src/share/classes/java/util/stream/Sink.java	Wed Mar 13 18:47:28 2013 -0400
@@ -44,6 +44,12 @@
  * method), which a source can poll before sending more data to the
  * {@code Sink}.
  *
+ * <p>A sink may be in one of two states: an initial state and an active
+ * state.   It starts out in the initial state; the {@code begin()} method transitions
+ * it to the active state, and the {@code end()} method transitions it back into the
+ * initial state, where it can be re-used.  Data-accepting methods (such as {@code accept()}
+ * are only valid in the active state.
+ *
  * @apiNote
  *
  * A stream pipeline consists of a source, zero or more intermediate stages
@@ -58,12 +64,12 @@
  *                  .max();
  * </pre>
  *
- * Here, we have three stages, filtering, mapping, and reducing.  The filtering
+ * <p>Here, we have three stages, filtering, mapping, and reducing.  The filtering
  * stage consumes strings and emits a subset of those strings; the mapping stage
  * consumes strings and emits ints; the reduction stage consumes those ints and
  * computes the maximal value.
  *
- * A {@code Sink} instance is used to represent each stage of this pipeline,
+ * <p>A {@code Sink} instance is used to represent each stage of this pipeline,
  * whether the stage accepts objects, ints, longs, or doubles.  Sink has entry
  * points for {@code accept(Object)}, {@code accept(int)}, etc, so that we do
  * not need a specialized interface for each primitive specialization.  (It
@@ -77,13 +83,13 @@
  * each stage must implement the correct {@code accept} method corresponding to
  * the data type it accepts.
  *
- * The specialized subtypes such as {@link Sink.OfInt} bridge
+ * <p>The specialized subtypes such as {@link Sink.OfInt} override
  * {@code accept(Object)} to call the appropriate primitive specialization of
  * {@code accept}, implement the appropriate primitive specialization of
  * {@code Consumer}, and re-abstract the appropriate primitive specialization of
  * {@code accept}.
  *
- * The chaining subtypes such as {@link ChainedInt} not only implement
+ * <p>The chaining subtypes such as {@link ChainedInt} not only implement
  * {@code Sink.OfInt}, but also maintain a {@code downstream} field which
  * represents the downstream {@code Sink}, and implement the methods
  * {@code begin()}, {@code end()}, and {@code cancellationRequested()} to
@@ -99,7 +105,7 @@
  *     };
  * </pre>
  *
- * Here, we implement {@code Sink.ChanedReference<U>}, meaning that we expect to
+ * <p>Here, we implement {@code Sink.ChainedReference<U>}, meaning that we expect to
  * receive elements of type {@code U} as input, and pass the downstream sink to
  * the constructor.  Because the next stage expects to receive integers, we must
  * call the {@code accept(int)} method when emitting values to the downstream.
@@ -116,13 +122,20 @@
      * {@code Sink} is being reused by multiple calculations.
      * @param size The exact size of the data to be pushed downstream, if
      * known or {@code Long.MAX_VALUE} if unknown or infinite.
+     *
+     * <p>Prior to this call, the sink must be in the initial state, and after this call
+     * it is in the active state.
      */
     default void begin(long size) {}
 
     /**
-     * Indicates that all elements have been pushed.  If the {@code Sink} buffers
-     * any results from previous values, they should dump their contents
-     * downstream and clear any stored state.
+     * Indicates that all elements have been pushed.  If the {@code Sink} is stateful,
+     * it should send any stored state downstream at this time, and should clear any
+     * accumulated state (and associated resources) so that the sink may be reused for
+     * another computation.
+     *
+     * <p>Prior to this call, the sink must be in the active state, and after this call
+     * it is returned to the initial state.
      */
     default void end() {}
 
@@ -130,6 +143,8 @@
      * Communicates to upstream sources that this {@code Sink} does not
      * wish to receive any more data
      *
+     * @implSpec The default implementation always returns false
+     *
      * @return true if cancellation is requested
      */
     default boolean cancellationRequested() {
@@ -138,6 +153,7 @@
 
     /**
      * Accepts an int value
+     *
      * @implSpec The default implementation throws IllegalStateException
      *
      * @throws IllegalStateException If this sink does not accept int values
@@ -221,7 +237,7 @@
     }
 
     /**
-     * {@code Sink} implementation designed for creating chains of sinks.  The
+     * Abstract {@code Sink} implementation designed for creating chains of sinks.  The
      * {@code begin} and {@code end}, and {@code cancellationRequested} methods
      * are wired to chain to the downstream {@code Sink}.  This implementation
      * takes a downstream {@code Sink} of unknown input shape and produces a
@@ -252,7 +268,7 @@
     }
 
     /**
-     * {@code Sink} implementation designed for creating chains of sinks.  The
+     * Abstract {@code Sink} implementation designed for creating chains of sinks.  The
      * {@code begin} and {@code end}, and {@code cancellationRequested} methods
      * are wired to chain to the downstream {@code Sink}.  This implementation
      * takes a downstream {@code Sink} of unknown input shape and produces a
@@ -284,7 +300,7 @@
     }
 
     /**
-     * {@code Sink} implementation designed for creating chains of sinks.  The
+     * Abstract {@code Sink} implementation designed for creating chains of sinks.  The
      * {@code begin} and {@code end}, and {@code cancellationRequested} methods
      * are wired to chain to the downstream {@code Sink}.  This implementation
      * takes a downstream {@code Sink} of unknown input shape and produces a
@@ -316,7 +332,7 @@
     }
 
     /**
-     * {@code Sink} implementation designed for creating chains of sinks.  The
+     * Abstract {@code Sink} implementation designed for creating chains of sinks.  The
      * {@code begin} and {@code end}, and {@code cancellationRequested} methods
      * are wired to chain to the downstream {@code Sink}.  This implementation
      * takes a downstream {@code Sink} of unknown input shape and produces a
--- a/src/share/classes/java/util/stream/StatefulOp.java	Wed Mar 13 18:45:25 2013 -0400
+++ b/src/share/classes/java/util/stream/StatefulOp.java	Wed Mar 13 18:47:28 2013 -0400
@@ -40,7 +40,7 @@
  * parallelizable, but are not amenable to the automatic parallelization of
  * stateless operations.  Accordingly, a stateful operation must provide its
  * own parallel execution implementation
- * ({@link #evaluateParallel(PipelineHelper)}).
+ * ({@link #evaluateParallel(PipelineHelper)}) as well as {@link IntermediateOp#wrapSink(int, Sink)}.
  *
  * @param <E> Type of input and output elements.
  *
@@ -64,6 +64,7 @@
     /**
      * {@inheritDoc}
      *
+     * @apiNote
      * An implementation of this method must be provided, but it is acceptable
      * if the implementation is sequential.  A generic sequential implementation
      * is available as
--- a/src/share/classes/java/util/stream/Tripwire.java	Wed Mar 13 18:45:25 2013 -0400
+++ b/src/share/classes/java/util/stream/Tripwire.java	Wed Mar 13 18:47:28 2013 -0400
@@ -32,7 +32,7 @@
  * {@code java.util.stream} classes.  The detection is turned on or off based on
  * whether the system property {@code org.openjdk.java.util.stream.tripwire} is
  * considered {@code true} according to {@link Boolean#getBoolean(String)}.
- * Turned off for production.
+ * This should normally be turned off for production use.
  *
  * @apiNote
  * Typical usage would be for boxing code to do: