changeset 6645:2f52eeca15e5

- pipeline wrapping spliterators should conform to spliterator contract - obtain the size, if known, from a spliterator before traversing operations are performed as such operations may be greedy and consume elements thus affecting the size. - when obtaining a spliterator for a depth 0 i.e. the source spliterator try to return the best spliterator for use with parallel streams.
author psandoz
date Thu, 29 Nov 2012 15:14:19 +0100
parents 3511cfe8925d
children 4d1f68c59366 cbef342202eb
files src/share/classes/java/util/stream/AbstractPipeline.java src/share/classes/java/util/stream/StreamShapeFactory.java
diffstat 2 files changed, 49 insertions(+), 12 deletions(-) [+]
line wrap: on
line diff
--- a/src/share/classes/java/util/stream/AbstractPipeline.java	Thu Nov 29 11:58:15 2012 +0100
+++ b/src/share/classes/java/util/stream/AbstractPipeline.java	Thu Nov 29 15:14:19 2012 +0100
@@ -531,16 +531,31 @@
         failOnLinked();
 
         if (depth == 0) {
-            return isParallel()
-                   ? (Spliterator<E_OUT>) sourceState.spliterator()
-                   : (Spliterator<E_OUT>) getOutputShape().sequentialSpliterator(sourceState.spliterator());
+            // Return the best spliterator for use with parallel evaluation
+            // If the source spliterator can be split then return that otherwise
+            // obtain the iterator of the spliterator then wrap that iterator
+            // in spliterator that supports splitting
+            Spliterator<E_OUT> s = (Spliterator<E_OUT>) sourceState.spliterator();
+            if (s.getNaturalSplits() > 0)  {
+                return s;
+            }
+            else {
+                // Obtain the size, if known, before obtaining the iterator
+                // The action of obtaining the iterator may be greedy and consume elements thus changing
+                // the size
+                int sizeIfKnown = s.getSizeIfKnown();
+                return getOutputShape().iteratorSpliterator(s.iterator(), sizeIfKnown);
+            }
         }
         else if (!StreamOpFlag.PARALLEL.isKnown(sourceState.getSourceFlags())) {
+            // If the source is not parallel then all of the pipeline is sequential
             Spliterator<?> s = sourceState.spliterator();
-            Iterator<E_OUT> iterator = wrapIterator(s.iterator(),
-                                                    sourceState.getSourceFlags());
-            return getOutputShape().iteratorSpliterator(iterator,
-                                                        StreamOpFlag.SIZED.isKnown(combinedOpFlags) ? s.getSizeIfKnown() : -1);
+            // Obtain the size, if known, before obtaining the iterator
+            // The action of obtaining the iterator may be greedy and consume elements thus changing
+            // the size
+            int sizeIfKnown = StreamOpFlag.SIZED.isKnown(combinedOpFlags) ? s.getSizeIfKnown() : -1;
+            Iterator<E_OUT> iterator = wrapIterator(s.iterator(), sourceState.getSourceFlags());
+            return getOutputShape().iteratorSpliterator(iterator, sizeIfKnown);
         }
         else {
             // Find the depth of the last stateful op to be evaluated in parallel
--- a/src/share/classes/java/util/stream/StreamShapeFactory.java	Thu Nov 29 11:58:15 2012 +0100
+++ b/src/share/classes/java/util/stream/StreamShapeFactory.java	Thu Nov 29 15:14:19 2012 +0100
@@ -24,6 +24,7 @@
  */
 package java.util.stream;
 
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.Iterators;
 import java.util.function.Block;
@@ -98,6 +99,8 @@
                 // @@@ Move to Streams?
                 class WrappingSpliterator implements Spliterator<P_OUT> {
                     final Spliterator<P_IN> spliterator;
+                    Iterator<P_OUT> iterator;
+
                     WrappingSpliterator(Spliterator<P_IN> spliterator) {
                         this.spliterator = spliterator;
                     }
@@ -109,17 +112,25 @@
 
                     @Override
                     public Spliterator<P_OUT> split() {
+                        if (iterator != null) {
+                            throw new IllegalStateException("split after starting traversal");
+                        }
                         return new WrappingSpliterator(spliterator.split());
                     }
 
                     @Override
                     public Iterator<P_OUT> iterator() {
-                        return pph.wrapIterator(spliterator.iterator());
+                        if (iterator == null) {
+                            iterator = pph.wrapIterator(spliterator.iterator());
+                        }
+                        return iterator;
                     }
 
                     @Override
                     public void forEach(Block<? super P_OUT> block) {
-                        if (!pph.isShortCircuit()) {
+                        if (!pph.isShortCircuit() && iterator == null) {
+                            iterator = Collections.emptyIterator();
+
                             Sink.OfReference<P_OUT> s = block::accept;
                             Sink wrapped = pph.wrapSink(s);
 
@@ -205,6 +216,7 @@
                 // @@@ Move to Primitives?
                 class WrappingSpliterator implements IntSpliterator {
                     final Spliterator spliterator;
+                    IntIterator iterator;
 
                     WrappingSpliterator(Spliterator spliterator) {
                         this.spliterator = spliterator;
@@ -217,12 +229,18 @@
 
                     @Override
                     public IntSpliterator split() {
+                        if (iterator != null) {
+                            throw new IllegalStateException("split after starting traversal");
+                        }
                         return new WrappingSpliterator(spliterator.split());
                     }
 
                     @Override
                     public IntIterator iterator() {
-                        return Primitives.adapt(pph.wrapIterator(spliterator.iterator()));
+                        if (iterator == null) {
+                            iterator = Primitives.adapt(pph.wrapIterator(spliterator.iterator()));
+                        }
+                        return iterator;
                     }
 
                     @Override
@@ -230,7 +248,9 @@
                         if (block instanceof IntBlock) {
                             forEach((IntBlock) block);
                         }
-                        else if (!pph.isShortCircuit()) {
+                        else if (!pph.isShortCircuit() && iterator == null) {
+                            iterator = Primitives.emptyIntIterator();
+
                             Sink.OfReference<Integer> blockSink = block::accept;
                             Sink wrapped = pph.wrapSink(blockSink);
 
@@ -245,7 +265,9 @@
 
                     @Override
                     public void forEach(IntBlock block) {
-                        if (!pph.isShortCircuit()) {
+                        if (!pph.isShortCircuit() && iterator == null) {
+                            iterator = Primitives.emptyIntIterator();
+
                             Sink.OfInt s = block::accept;
                             Sink wrapped = pph.wrapSink(s);