changeset 6436:1440e2129616

- Fail fast streams. BaseStream.iterator() is considered a terminal operation. Only one terminal operation may be performed on at most one stream in the pipeline, otherwise an ISE will result. - removed mixed test case scenarios
author psandoz
date Mon, 19 Nov 2012 15:19:28 +0100
parents b5c2376af431
children 1359dedd5475
files src/share/classes/java/util/stream/AbstractPipeline.java src/share/classes/java/util/stream/primitive/IntPipeline.java test-ng/tests/org/openjdk/tests/java/util/stream/OpTestCase.java test-ng/tests/org/openjdk/tests/java/util/stream/StreamReuseTest.java test-ng/tests/org/openjdk/tests/java/util/stream/StreamTestScenario.java test-ng/tests/org/openjdk/tests/java/util/stream/op/PrimitiveOpsTests.java test-ng/tests/org/openjdk/tests/java/util/stream/primitive/IntStreamTestScenario.java
diffstat 7 files changed, 244 insertions(+), 294 deletions(-) [+]
line wrap: on
line diff
--- a/src/share/classes/java/util/stream/AbstractPipeline.java	Sun Nov 18 23:51:46 2012 -0500
+++ b/src/share/classes/java/util/stream/AbstractPipeline.java	Mon Nov 19 15:19:28 2012 +0100
@@ -79,16 +79,7 @@
     protected final AbstractPipeline<?, E_IN> upstream;
     protected final IntermediateOp<E_IN, E_OUT> op;
     protected final int depth;
-
-    // Head state of pipeline
-    protected final Spliterator<?> spliterator;
-    protected final int sourceFlags;
-    protected final StreamShape shape;
-
-    /**
-     * If non-{@code null} then we are in sequential iteration mode.
-     */
-    protected Iterator<E_OUT> iterator;
+    protected final SourceState<?> sourceState;
 
     /**
      * Constructor for the element source of a pipeline.
@@ -98,13 +89,11 @@
      * @param shape the shape of the source
      */
     protected AbstractPipeline(Spliterator<?> spliterator, int sourceFlags, StreamShape shape) {
+        Objects.requireNonNull(spliterator);
         this.upstream = null;
         this.op = null;
         this.depth = 0;
-
-        this.spliterator = Objects.requireNonNull(spliterator);
-        this.sourceFlags = sourceFlags;
-        this.shape = shape;
+        this.sourceState = new SpliteratorSourceState<>(spliterator, sourceFlags, shape);
     }
 
     /**
@@ -117,30 +106,21 @@
         this.upstream = Objects.requireNonNull(upstream);
         this.op = Objects.requireNonNull(op);
         this.depth = upstream.depth + 1;
-
-        this.spliterator = upstream.spliterator;
-        this.sourceFlags = upstream.sourceFlags;
-        this.shape = upstream.shape;
+        this.sourceState = upstream.sourceState;
 
         assert upstream.getOutputShape().getStreamType() == op.inputShape().getStreamType();
         assert (upstream.depth == 0) ^ (upstream.op != null);
     }
 
     protected<R> R evaluate(TerminalOp<E_OUT, R> terminal) {
-        if (StreamOpFlags.PARALLEL.isKnown(sourceFlags)) {
+        if (StreamOpFlags.PARALLEL.isKnown(sourceState.getSourceFlags())) {
             return evaluateParallel(terminal);
         }
         else
-            return evaluateSequential(terminal, sourceFlags);
+            return evaluateSequential(terminal, sourceState.getSourceFlags());
     }
 
     protected<R> R evaluateParallel(TerminalOp<E_OUT, R> terminal) {
-        if (isPipelinePulled()) {
-            // If already pulled then cannot split, revert back to sequential evaluation
-            // Ensure the parallel flag is cleared
-            return evaluateSequential(terminal, StreamOpFlags.PARALLEL.clearFromFlags(sourceFlags));
-        }
-
         final IntermediateOp[] ops = ops();
         // Ops flags length is one greater than op array to hold initial value
         // Given an op whose index is i then the flags input to that op are at flags[i]
@@ -149,8 +129,8 @@
         int fromOp = 0;
         int upToOp = 0;
         Node<?> iNode = null;
-        Spliterator<?> iSource = spliterator;
-        int iSourceFlags = sourceFlags;
+        Spliterator<?> iSource = sourceState.spliterator();
+        int iSourceFlags = sourceState.getSourceFlags();
         while (true) {
             while (upToOp < ops.length && !ops[upToOp].isStateful())
                 upToOp++;
@@ -207,7 +187,7 @@
 
     @SuppressWarnings("unchecked")
     protected <R> R evaluateSequential(TerminalOp<E_OUT, R> terminal, int sourceFlags) {
-        return (R) terminal.evaluateSequential(new SequentialImplPipelineHelperSource(sourceFlags, terminal.inputShape()));
+        return (R) terminal.evaluateSequential(new SequentialImplPipelineHelper(sourceState.spliterator(), sourceFlags, terminal.inputShape()));
     }
 
     static abstract class AbstractPipelineHelper<P_IN, P_OUT> implements PipelineHelper<P_IN, P_OUT> {
@@ -323,6 +303,10 @@
             super(node, spliterator, sourceFlags, opsFlags, ops, from, to, terminalShape);
         }
 
+        <R> SequentialImplPipelineHelper(Spliterator spliterator, int sourceFlags, StreamShape terminalShape) {
+            this(spliterator, sourceFlags, ops(), terminalShape);
+        }
+
         boolean requirePull() {
             return isShortCircuit();
         }
@@ -332,7 +316,7 @@
             Objects.requireNonNull(sink);
 
             if (requirePull()) {
-                sink.begin(-1);
+                sink.begin(getOutputSizeIfKnown());
                 iterator().forEach(sink);
                 sink.end();
             }  else {
@@ -359,28 +343,6 @@
         }
     }
 
-    class SequentialImplPipelineHelperSource extends SequentialImplPipelineHelper {
-
-        <R> SequentialImplPipelineHelperSource(int sourceFlags, StreamShape terminalShape) {
-            super(AbstractPipeline.this.spliterator, sourceFlags, ops(), terminalShape);
-        }
-
-        @Override
-        boolean requirePull() {
-            return isPipelinePulled() ? true : super.requirePull();
-        }
-
-        @Override
-        public int getOutputSizeIfKnown() {
-            return isPipelinePulled() ? -1 : super.getOutputSizeIfKnown();
-        }
-
-        @Override
-        public Iterator<E_OUT> iterator() {
-            return AbstractPipeline.this.iterator();
-        }
-    }
-
     class ParallelImplPipelineHelper<P_IN> extends AbstractPipelineHelper<P_IN, E_OUT>
             implements ParallelPipelineHelper<P_IN, E_OUT> {
         final long targetSize;
@@ -450,22 +412,6 @@
         return ops;
     }
 
-    private boolean isPipelinePulled() {
-        if (iterator != null)
-            return true;
-
-        AbstractPipeline p = this.upstream;
-        while (p != null) {
-            if (p.iterator != null) {
-                // @@@ Should iterator() be called to aggressively update this and all upstream non-null iterators?
-                return true;
-            }
-            p = p.upstream;
-        }
-
-        return false;
-    }
-
     // Chaining and result methods
 
     /**
@@ -537,7 +483,7 @@
      * source. Otherwise, it's output shape corresponds to the output shape of the associated operation.
      */
     public StreamShape getOutputShape() {
-        return op == null ? shape : op.outputShape();
+        return op == null ? sourceState.getSourceShape() : op.outputShape();
     }
 
     /**
@@ -547,7 +493,7 @@
      * source. Otherwise, it's input shape corresponds to the input shape of the associated operation.
      */
     public StreamShape getInputShape() {
-        return op == null ? shape : op.inputShape();
+        return op == null ? sourceState.getSourceShape() : op.inputShape();
     }
 
     // BaseStream
@@ -555,36 +501,84 @@
     @Override
     @SuppressWarnings("unchecked")
     public Iterator<E_OUT> iterator() {
-        if (iterator == null) {
-            AbstractPipeline[] pipes = new AbstractPipeline[depth + 1];
-            AbstractPipeline p = this;
-            for (int i = depth; i >= 0; i--) {
-                pipes[i] = p;
-                p = p.upstream;
-            }
+        Iterator iterator = sourceState.iterator();
 
-            // Check if this is the first pull and if so get the source iterator
-            if (pipes[0].iterator == null) {
-                pipes[0].iterator = spliterator.iterator();
-            }
+        AbstractPipeline[] pipes = new AbstractPipeline[depth + 1];
+        AbstractPipeline p = this;
+        for (int i = depth; i >= 0; i--) {
+            pipes[i] = p;
+            p = p.upstream;
+        }
 
-            // Ensure the parallel flag is cleared, if set
-            int sourceFlags = StreamOpFlags.PARALLEL.clearFromFlags(this.sourceFlags);
-            int opsFlags = StreamOpFlags.INITIAL_OPS_VALUE;
-            for (int i = 1; i <= depth; i++) {
-                p = pipes[i];
-                if (p.iterator == null) {
-                    p.iterator = p.op.wrapIterator(StreamOpFlags.combineStreamFlags(sourceFlags, opsFlags),
-                                                   pipes[i - 1].iterator);
-                }
-                opsFlags = StreamOpFlags.combineOpFlags(pipes[i].op.getOpFlags(), opsFlags);
-            }
+        // Ensure the parallel flag is cleared, if set
+        int sourceFlags = StreamOpFlags.PARALLEL.clearFromFlags(sourceState.getSourceFlags());
+        int opsFlags = StreamOpFlags.INITIAL_OPS_VALUE;
+        for (int i = 1; i <= depth; i++) {
+            p = pipes[i];
+            iterator = p.op.wrapIterator(StreamOpFlags.combineStreamFlags(sourceFlags, opsFlags), iterator);
+            opsFlags = StreamOpFlags.combineOpFlags(pipes[i].op.getOpFlags(), opsFlags);
         }
+
         return iterator;
     }
 
     @Override
     public boolean isParallel() {
-        return StreamOpFlags.PARALLEL.isKnown(sourceFlags);
+        return StreamOpFlags.PARALLEL.isKnown(sourceState.getSourceFlags());
+    }
+
+    private interface SourceState<T> {
+        Spliterator<T> spliterator();
+        Iterator<T> iterator();
+        int getSourceFlags();
+        StreamShape getSourceShape();
+        boolean isConsumed();
+    }
+
+    private static class SpliteratorSourceState<T> implements SourceState<T> {
+        private final Spliterator<T> spliterator;
+        private final int sourceFlags;
+        private final StreamShape sourceShape;
+        private boolean consumed = false;
+        private Exception capturePoint;
+
+        private SpliteratorSourceState(Spliterator<T> spliterator, int flags, StreamShape sourceShape) {
+            this.spliterator = spliterator;
+            this.sourceFlags = flags;
+            this.sourceShape = sourceShape;
+        }
+
+        @Override
+        public Spliterator<T> spliterator() {
+            if (consumed)
+                throw new IllegalStateException("Stream source is already consumed", capturePoint);
+            consumed = true;
+            capturePoint = new RuntimeException("captured here");
+            return spliterator;
+        }
+
+        @Override
+        public Iterator<T> iterator() {
+            if (consumed)
+                throw new IllegalStateException("Stream source is already consumed", capturePoint);
+            consumed = true;
+            capturePoint = new RuntimeException("captured here");
+            return spliterator.iterator();
+        }
+
+        @Override
+        public int getSourceFlags() {
+            return sourceFlags;
+        }
+
+        @Override
+        public StreamShape getSourceShape() {
+            return sourceShape;
+        }
+
+        @Override
+        public boolean isConsumed() {
+            return consumed;
+        }
     }
 }
--- a/src/share/classes/java/util/stream/primitive/IntPipeline.java	Sun Nov 18 23:51:46 2012 -0500
+++ b/src/share/classes/java/util/stream/primitive/IntPipeline.java	Mon Nov 19 15:19:28 2012 +0100
@@ -33,8 +33,6 @@
 
 public class IntPipeline<E_IN> extends AbstractPipeline<E_IN, Integer> implements IntStream {
 
-    private IntIterator iterator;
-
     public IntPipeline(IntSpliterator source, int flags) {
         super(source, flags, StreamShapeFactory.INT_VALUE);
     }
@@ -47,10 +45,7 @@
 
     @Override
     public IntIterator iterator() {
-        if (iterator == null) {
-            iterator = Primitives.adapt(super.iterator());
-        }
-        return iterator;
+        return Primitives.adapt(super.iterator());
     }
 
     //
--- a/test-ng/tests/org/openjdk/tests/java/util/stream/OpTestCase.java	Sun Nov 18 23:51:46 2012 -0500
+++ b/test-ng/tests/org/openjdk/tests/java/util/stream/OpTestCase.java	Mon Nov 19 15:19:28 2012 +0100
@@ -321,9 +321,8 @@
         {
             // @@@ Using specific stream implementation
             AbstractPipeline<?, U> source = shape.stream(node);
-            // Force pull by requesting iterator
-            // @@@ seems fragile is there a better way?
-            source.iterator();
+            // Force pull mode
+            source = source.pipeline(new PullOnlyOp<U>(shape));
             R result = terminalF.apply((S_OUT) source);
             assertTrue(equalator.test(refResult, result), String.format("Single sequential pull: %s != %s", refResult, result));
         }
@@ -345,9 +344,8 @@
         // All sequential using pull
         {
             S_OUT source = streamF.apply(data.stream());
-            // Force pull by requesting iterator
-            // @@@ seems fragile is there a better way?
-            source.iterator();
+            // Force pull mode
+            source = (S_OUT) ((AbstractPipeline) source).pipeline(new PullOnlyOp<U>(shape));
             R result = terminalF.apply(source);
             assertTrue(equalator.test(refResult, result), String.format("All sequential pull: %s != %s", refResult, result));
         }
@@ -358,16 +356,6 @@
             assertTrue(equalator.test(refResult, result), String.format("All parallel: %s != %s", refResult, result));
         }
 
-        // All parallel using pull
-        {
-            S_OUT source = streamF.apply(data.parallel());
-            // Force pull by requesting iterator
-            // @@@ seems fragile is there a better way?
-            source.iterator();
-            R result = terminalF.apply(source);
-            assertTrue(equalator.test(refResult, result), String.format("All parallel pull: %s != %s", refResult, result));
-        }
-
         return refResult;
     }
 
@@ -470,4 +458,28 @@
             return chain(par(), terminal, ops);
         }
     }
+
+    @SuppressWarnings("rawtypes")
+    private class PullOnlyOp<T> implements IntermediateOp<T,T> {
+        private final StreamShape shape;
+
+        private PullOnlyOp(StreamShape shape) {
+            this.shape = shape;
+        }
+
+        @Override
+        public Iterator<T> wrapIterator(int flags, Iterator<T> in) {
+            return in;
+        }
+
+        @Override
+        public Sink<T> wrapSink(int flags, Sink<T> sink) {
+            throw new IllegalStateException("pull only!");
+        }
+
+        @Override
+        public int getOpFlags() {
+            return StreamOpFlags.IS_SHORT_CIRCUIT;
+        }
+    }
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test-ng/tests/org/openjdk/tests/java/util/stream/StreamReuseTest.java	Mon Nov 19 15:19:28 2012 +0100
@@ -0,0 +1,128 @@
+/*
+ * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package org.openjdk.tests.java.util.stream;
+
+import org.openjdk.tests.java.util.stream.primitive.IntStreamTestData;
+import org.openjdk.tests.java.util.stream.primitive.IntStreamTestDataProvider;
+import org.testng.annotations.Test;
+
+import java.util.function.Function;
+import java.util.stream.BaseStream;
+import java.util.stream.Stream;
+import java.util.stream.primitive.IntStream;
+
+import static org.testng.Assert.fail;
+
+/**
+ * StreamReuseTest
+ *
+ * @author Brian Goetz
+ */
+@Test
+public class StreamReuseTest {
+
+    private <T, U, E, S extends BaseStream<E>, D extends OpTestCase.TestData<E, S>> void assertSecondFails(
+            D data,
+            Function<S, T> first,
+            Function<S, U> second,
+            Class<? extends Throwable> exception,
+            String text) {
+        S stream = data.stream();
+        T fr = first.apply(stream);
+        try {
+            U sr = second.apply(stream);
+            fail(text + " (seq)");
+        }
+        catch (Throwable e) {
+            if (exception.isAssignableFrom(e.getClass()))
+                return;
+            else if (e instanceof Error)
+                throw (Error) e;
+            else if (e instanceof RuntimeException)
+                throw (RuntimeException) e;
+            else
+                throw new AssertionError("Unexpected exception " + e.getClass(), e);
+        }
+
+        stream = data.parallel();
+        fr = first.apply(stream);
+        try {
+            U sr = second.apply(stream);
+            fail(text + " (par)");
+        }
+        catch (Throwable e) {
+            if (exception.isAssignableFrom(e.getClass()))
+                return;
+            else if (e instanceof Error)
+                throw (Error) e;
+            else if (e instanceof RuntimeException)
+                throw (RuntimeException) e;
+            else
+                throw new AssertionError("Unexpected exception " + e.getClass(), e);
+        }
+    }
+
+    // Stream
+
+    @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
+    public void testTwoTerminals(String name, StreamTestData<Integer> data) {
+        assertSecondFails(data,
+                          (Stream<Integer> s) -> s.findFirst(), (Stream<Integer> s) -> s.findFirst(),
+                          IllegalStateException.class,
+                          "Stream findFirst / findFirst succeeded erroneously");
+    }
+
+    @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
+    public void testTwoIterators(String name, StreamTestData<Integer> data) {
+        assertSecondFails(data,
+                          (Stream<Integer> s) -> s.iterator(), (Stream<Integer> s) -> s.iterator(),
+                          IllegalStateException.class,
+                          "Stream iterator / iterator succeeded erroneously");
+    }
+
+    @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
+    public void testTerminalIterator(String name, StreamTestData<Integer> data) {
+        assertSecondFails(data,
+                          (Stream<Integer> s) -> s.iterator(), (Stream<Integer> s) -> s.findFirst(),
+                          IllegalStateException.class,
+                          "Stream findFirst / iterator succeeded erroneously");
+        assertSecondFails(data,
+                          (Stream<Integer> s) -> s.findFirst(), (Stream<Integer> s) -> s.iterator(),
+                          IllegalStateException.class,
+                          "Stream iterator / findFirst succeeded erroneously");
+    }
+
+    // IntStream
+
+    @Test(dataProvider = "IntStreamTestData", dataProviderClass = IntStreamTestDataProvider.class)
+    public void testTwoIterators(String name, IntStreamTestData data) {
+        assertSecondFails(data,
+                          (IntStream s) -> s.iterator(), (IntStream s) -> s.iterator(),
+                          IllegalStateException.class,
+                          "IntStream iterator / iterator succeeded erroneously");
+    }
+
+    // @@@ findFirst tests when implemented
+}
--- a/test-ng/tests/org/openjdk/tests/java/util/stream/StreamTestScenario.java	Sun Nov 18 23:51:46 2012 -0500
+++ b/test-ng/tests/org/openjdk/tests/java/util/stream/StreamTestScenario.java	Mon Nov 19 15:19:28 2012 +0100
@@ -67,28 +67,6 @@
         }
     },
 
-    // Wrap as stream, and iterate in mixed mode
-    STREAM_MIXED(false) {
-        <T, U, S_IN extends BaseStream<T>> void _run(OpTestCase.TestData<T, S_IN> data, Block<U> b, Function<S_IN, Stream<U>> m) {
-            Stream<U> stream = m.apply(data.stream());
-            Iterator<U> iter = stream.iterator();
-            if (iter.hasNext())
-                b.accept(iter.next());
-            stream.forEach(b);
-        }
-    },
-
-    // Wrap as two connected streams, request iterator for the upstream, and do forEach on the downstream
-    STREAM_MIXED_ITERATOR_FOR_EACH(false) {
-        <T, U, S_IN extends BaseStream<T>> void _run(OpTestCase.TestData<T, S_IN> data, Block<U> b, Function<S_IN, Stream<U>> m) {
-            AbstractPipeline<T, T> pipe1 = data.seq(new NoOp(data.getShape()));
-            Stream<U> pipe2 = m.apply((S_IN) pipe1);
-
-            pipe1.iterator();
-            pipe2.forEach(b);
-        }
-    },
-
     // Wrap as parallel stream + sequential
     PAR_STREAM_SEQUENTIAL_FOR_EACH(true) {
         <T, U, S_IN extends BaseStream<T>> void _run(OpTestCase.TestData<T, S_IN> data, Block<U> b, Function<S_IN, Stream<U>> m) {
@@ -134,74 +112,8 @@
         }
     },
 
-    // Wrap as parallel stream, and iterate in mixed mode
-    PAR_STREAM_ITERATOR_TO_ARRAY_MIXED(true) {
-        <T, U, S_IN extends BaseStream<T>> void _run(OpTestCase.TestData<T, S_IN> data, Block<U> b, Function<S_IN, Stream<U>> m) {
-            Stream<U> stream = m.apply(data.parallel());
-            Iterator<U> iter = stream.iterator();
-            if (iter.hasNext())
-                b.accept(iter.next());
-            for (Object t : stream.toArray())
-                b.accept((U) t);
-        }
-    },
-
-    // Wrap as two connected streams, request iterator for the upstream, and do forEach on the downstream
-    PAR_STREAM_MIXED_ITERATOR_TO_ARRAY(true) {
-        <T, U, S_IN extends BaseStream<T>> void _run(OpTestCase.TestData<T, S_IN> data, Block<U> b, Function<S_IN, Stream<U>> m) {
-            AbstractPipeline<T, T> pipe1 = data.seq(new NoOp(data.getShape()));
-            Stream<U> pipe2 = m.apply((S_IN) pipe1);
-
-            pipe1.iterator();
-            for (Object t : pipe2.toArray())
-                b.accept((U) t);
-        }
-    },
-
-    // Wrap as parallel stream, and iterate in mixed mode
-    PAR_STREAM_SEQUENTIAL_MIXED(true) {
-        <T, U, S_IN extends BaseStream<T>> void _run(OpTestCase.TestData<T, S_IN> data, Block<U> b, Function<S_IN, Stream<U>> m) {
-            Stream<U> stream = m.apply(data.parallel());
-            Iterator<U> iter = stream.iterator();
-            if (iter.hasNext())
-                b.accept(iter.next());
-            stream.sequential().forEach(b);
-        }
-    },
-
-    // More ways to iterate the PSS: iterate result of op
-    // Extends testing to test whether computation happens in- or out-of-thread
     ;
 
-    private static class NoOp<T> implements IntermediateOp<T, T> {
-
-        private final StreamShape shape;
-
-        private NoOp(StreamShape shape) {
-            this.shape = shape;
-        }
-
-        @Override
-        public StreamShape inputShape() {
-            return shape;
-        }
-
-        @Override
-        public StreamShape outputShape() {
-            return shape;
-        }
-
-        @Override
-        public Iterator<T> wrapIterator(int flags, Iterator<T> in) {
-            return in;
-        }
-
-        @Override
-        public Sink<T> wrapSink(int flags, Sink<T> sink) {
-            return sink;
-        }
-    }
-
     private boolean isParallel;
 
     StreamTestScenario(boolean isParallel) {
--- a/test-ng/tests/org/openjdk/tests/java/util/stream/op/PrimitiveOpsTests.java	Sun Nov 18 23:51:46 2012 -0500
+++ b/test-ng/tests/org/openjdk/tests/java/util/stream/op/PrimitiveOpsTests.java	Mon Nov 19 15:19:28 2012 +0100
@@ -52,15 +52,6 @@
         Assert.assertEquals(sum, 20);
     }
 
-    public void testSumPull() {
-        IntStream s = Primitives.range(1, 10).filter(i -> i % 2 == 0);
-
-        IntIterator ii = s.iterator();
-        int i = ii.nextInt();
-
-        Assert.assertEquals(s.sum() + i, 20);
-    }
-
     public void testTee() {
         int[] teeSum = new int[1];
         int sum = Primitives.range(1, 10).filter(i -> i % 2 == 0).tee(i -> { teeSum[0] = teeSum[0] + i; }).sum();
--- a/test-ng/tests/org/openjdk/tests/java/util/stream/primitive/IntStreamTestScenario.java	Sun Nov 18 23:51:46 2012 -0500
+++ b/test-ng/tests/org/openjdk/tests/java/util/stream/primitive/IntStreamTestScenario.java	Mon Nov 19 15:19:28 2012 +0100
@@ -60,26 +60,6 @@
         }
     },
 
-    STREAM_MIXED(false) {
-        <T, S_IN extends BaseStream<T>> void _run(OpTestCase.TestData<T, S_IN> data, IntBlock b, Function<S_IN, IntStream> m) {
-            IntStream stream = m.apply(data.stream());
-            IntIterator iter = stream.iterator();
-            if (iter.hasNext())
-                b.applyInt(iter.nextInt());
-            stream.forEach(b);
-        }
-    },
-
-    STREAM_MIXED_ITERATOR_FOR_EACH(false) {
-        <T, S_IN extends BaseStream<T>> void _run(OpTestCase.TestData<T, S_IN> data, IntBlock b, Function<S_IN, IntStream> m) {
-            AbstractPipeline<?, ?> pipe1 = data.seq(new NoOp(data.getShape()));
-            IntStream pipe2 = m.apply((S_IN) pipe1);
-
-            pipe1.iterator();
-            pipe2.forEach(b);
-        }
-    },
-
     PAR_STREAM_SEQUENTIAL_FOR_EACH(true) {
         <T, S_IN extends BaseStream<T>> void _run(OpTestCase.TestData<T, S_IN> data, IntBlock b, Function<S_IN, IntStream> m) {
             m.apply(data.parallel()).sequential().forEach(b);
@@ -113,70 +93,8 @@
         }
     },
 
-    PAR_STREAM_ITERATOR_TO_ARRAY_MIXED(true) {
-        <T, S_IN extends BaseStream<T>> void _run(OpTestCase.TestData<T, S_IN> data, IntBlock b, Function<S_IN, IntStream> m) {
-            IntStream stream = m.apply(data.parallel());
-            IntIterator iter = stream.iterator();
-            if (iter.hasNext())
-                b.applyInt(iter.nextInt());
-            for (int t : stream.toArray())
-                b.applyInt(t);
-        }
-    },
-
-    PAR_STREAM_MIXED_ITERATOR_TO_ARRAY(true) {
-        <T, S_IN extends BaseStream<T>> void _run(OpTestCase.TestData<T, S_IN> data, IntBlock b, Function<S_IN, IntStream> m) {
-            AbstractPipeline<?, ?> pipe1 = data.par(new NoOp(data.getShape()));
-            IntStream pipe2 = m.apply((S_IN) pipe1);
-
-            pipe1.iterator();
-            for (int t : pipe2.toArray())
-                b.applyInt(t);
-        }
-    },
-
-    // Wrap as parallel stream, and iterate in mixed mode
-    PAR_STREAM_SEQUENTIAL_MIXED(true) {
-        <T, S_IN extends BaseStream<T>> void _run(OpTestCase.TestData<T, S_IN> data, IntBlock b, Function<S_IN, IntStream> m) {
-            IntStream stream = m.apply(data.parallel());
-            IntIterator iter = stream.iterator();
-            if (iter.hasNext())
-                b.applyInt(iter.nextInt());
-            stream.sequential().forEach(b);
-        }
-    },
-
     ;
 
-    private static class NoOp<T> implements IntermediateOp<T, T> {
-
-        private final StreamShape shape;
-
-        private NoOp(StreamShape shape) {
-            this.shape = shape;
-        }
-
-        @Override
-        public StreamShape inputShape() {
-            return shape;
-        }
-
-        @Override
-        public StreamShape outputShape() {
-            return shape;
-        }
-
-        @Override
-        public Iterator<T> wrapIterator(int flags, Iterator<T> in) {
-            return in;
-        }
-
-        @Override
-        public Sink<T> wrapSink(int flags, Sink<T> sink) {
-            return sink;
-        }
-    }
-
     private boolean isParallel;
 
     IntStreamTestScenario(boolean isParallel) {