changeset 6345:d297a36079ce

Fix errors in handling of parallel streams that have already been pulled from; better testing of mixed mode parallel streams Contributed-By: paul.sandoz@oracle.com
author briangoetz
date Wed, 24 Oct 2012 11:31:21 -0400
parents f5f46449a65e
children c01df4ef1790
files src/share/classes/java/util/streams/AbstractPipeline.java test-ng/tests/org/openjdk/tests/java/util/streams/ops/StreamOpTestCase.java
diffstat 2 files changed, 31 insertions(+), 6 deletions(-) [+]
line wrap: on
line diff
--- a/src/share/classes/java/util/streams/AbstractPipeline.java	Wed Oct 24 11:26:38 2012 -0400
+++ b/src/share/classes/java/util/streams/AbstractPipeline.java	Wed Oct 24 11:31:21 2012 -0400
@@ -89,11 +89,9 @@
     }
 
     protected<R> R evaluateParallel(TerminalOp<E_OUT, R> terminal) {
-        // @@@ Need to check if any upstream streams have been pulled using iterator
         if (iterator != null) {
-            // @@@ Is this assumption correct for all sources and pipelines?
-            // @@@ Can default to serial evaluation
-            throw new IllegalStateException("A stream that has been iterated on (partially or otherwise) cannot be evaluated in parallel");
+            // If already pulled then cannot split, revert back to sequential evaluation
+            return evaluateSerial(terminal);
         }
 
         final IntermediateOp[] ops = ops();
@@ -232,10 +230,14 @@
         }
 
         @Override
+        public int getOutputSizeIfKnown() {
+            return (iterator == null && isOutputSizeKnown()) ? source.getSizeIfKnown() : -1;
+        }
+
+        @Override
         public<S extends Sink<E_OUT>> S into(S sink) {
             Objects.requireNonNull(sink);
 
-            // @@@ Need to check if any upstream streams have been pulled using iterator
             if (isShortCircuit() || iterator != null) {
                 Iterator<E_OUT> it = iterator();
                 sink.begin(-1);
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/StreamOpTestCase.java	Wed Oct 24 11:26:38 2012 -0400
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/StreamOpTestCase.java	Wed Oct 24 11:31:21 2012 -0400
@@ -100,7 +100,7 @@
         },
 
         // Wrap as stream, and iterate in mixed mode
-        STREAM_ITERATOR_FOR_EACH(false) {
+        STREAM_MIXED(false) {
             <T> void run(TestData<T> data, Sink sink, IntermediateOp[] ops) {
                 Stream<?> stream = stream(data.seq(ops));
                 Iterator<?> iter = stream.iterator();
@@ -134,6 +134,29 @@
             }
         },
 
+        // Wrap as parallel stream, and iterate in mixed mode
+        PAR_STREAM_ITERATOR_TO_ARRAY_MIXED(true) {
+            <T> void run(TestData<T> data, Sink sink, IntermediateOp[] ops) {
+                Stream<?> stream = stream(data.par(ops));
+                Iterator<?> iter = stream.iterator();
+                if (iter.hasNext())
+                    sink.accept(iter.next());
+                for (Object t : stream.toArray())
+                    sink.accept(t);
+            }
+        },
+
+        // Wrap as parallel stream, and iterate in mixed mode
+        PAR_STREAM_SEQUENTIAL_MIXED(true) {
+            <T> void run(TestData<T> data, Sink sink, IntermediateOp[] ops) {
+                Stream<?> stream = stream(data.par(ops));
+                Iterator<?> iter = stream.iterator();
+                if (iter.hasNext())
+                    sink.accept(iter.next());
+                stream.sequential().forEach(sink);
+            }
+        },
+
         // More ways to iterate the PSS: iterate result of op
         // Extends testing to test whether computation happens in- or out-of-thread
         ;