changeset 6410:e80b94da6538

- OpTestCase.TestData generic to the input stream type. - exercising ops generic to the input and output stream. - expected result can be declared to override inducing the expected result.
author psandoz
date Thu, 15 Nov 2012 17:55:02 +0100
parents 5cdf42e26646
children ae4e5994cbb4
files test-ng/tests/org/openjdk/tests/java/util/LambdaTestHelpers.java test-ng/tests/org/openjdk/tests/java/util/streams/OpTestCase.java test-ng/tests/org/openjdk/tests/java/util/streams/StreamIntermediateOpTestScenario.java test-ng/tests/org/openjdk/tests/java/util/streams/StreamTestData.java test-ng/tests/org/openjdk/tests/java/util/streams/ops/ConcatOpTest.java test-ng/tests/org/openjdk/tests/java/util/streams/ops/FilterOpTest.java test-ng/tests/org/openjdk/tests/java/util/streams/ops/FindFirstOpTest.java test-ng/tests/org/openjdk/tests/java/util/streams/ops/MapOpTest.java test-ng/tests/org/openjdk/tests/java/util/streams/ops/ReduceByOpTest.java test-ng/tests/org/openjdk/tests/java/util/streams/ops/SliceOpTest.java test-ng/tests/org/openjdk/tests/java/util/streams/ops/TeeOpTest.java test-ng/tests/org/openjdk/tests/java/util/streams/ops/UniqOpTest.java test-ng/tests/org/openjdk/tests/java/util/streams/primitives/IntStreamIntermediateOpTestScenario.java test-ng/tests/org/openjdk/tests/java/util/streams/primitives/IntStreamTestData.java
diffstat 14 files changed, 350 insertions(+), 289 deletions(-) [+]
line wrap: on
line diff
--- a/test-ng/tests/org/openjdk/tests/java/util/LambdaTestHelpers.java	Thu Nov 15 17:35:31 2012 +0100
+++ b/test-ng/tests/org/openjdk/tests/java/util/LambdaTestHelpers.java	Thu Nov 15 17:55:02 2012 +0100
@@ -239,14 +239,6 @@
         assertContents(actual, Arrays.asList(expected).iterator());
     }
 
-    public static void assertStreamContents(List<Integer> sourceData, UnaryOperator<Stream<Integer>> mapper, List<Integer> expected) {
-        Integer[] source = sourceData.toArray(new Integer[sourceData.size()]);
-
-        assertContents(mapper.operate(Arrays.stream(source)).iterator(), expected.iterator());
-        assertContents(mapper.operate(Arrays.parallel(source)).sequential().iterator(), expected.iterator());
-        assertContents(mapper.operate(Arrays.parallel(source)).sequential().into(new ArrayList<Integer>()).iterator(), expected.iterator());
-    }
-
     public static <T> boolean equalsContentsUnordered(Iterable<T> a, Iterable<T> b) {
         Set<T> sa = new HashSet<>();
         for (T t : a) {
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/OpTestCase.java	Thu Nov 15 17:35:31 2012 +0100
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/OpTestCase.java	Thu Nov 15 17:55:02 2012 +0100
@@ -35,6 +35,7 @@
 import java.util.functions.BiPredicate;
 import java.util.functions.Block;
 import java.util.functions.Factory;
+import java.util.functions.Mapper;
 import java.util.streams.*;
 import java.util.streams.ops.*;
 import java.util.streams.primitives.IntStream;
@@ -65,76 +66,98 @@
 
         boolean isParallel();
 
-        default boolean isApplicable(IntermediateOp[] ops) {
-            return true;
-        }
-
-        abstract <T> void run(TestData<T> data, Block b, IntermediateOp[] ops);
+        abstract <T, U, S_IN extends BaseStream<T>, S_OUT extends BaseStream<U>> void run(TestData<T, S_IN> data, Block<U> b, Mapper<S_OUT, S_IN> m);
     }
 
-    @SuppressWarnings("rawtypes")
-    protected <T, U> Node<U> exerciseOps(TestData<T> data, IntermediateOp... ops) {
-        return withData(data).<U>using(ops).exercise();
+    @SuppressWarnings("unchecked")
+    public <T, U, S_IN extends BaseStream<T>> Node<U> exerciseOps(TestData<T, S_IN> data, IntermediateOp... ops) {
+        return (Node<U>) withData(data).
+                using(ops).
+                exercise();
     }
 
-    public <T> IntermediateOpDataBuilder<T> withData(TestData<T> data) {
+    public <T, U, S_IN extends BaseStream<T>, S_OUT extends BaseStream<U>> Node<U> exerciseOps(TestData<T, S_IN> data, Mapper<S_OUT, S_IN> m) {
+        return withData(data).
+                using(m).
+                exercise();
+    }
+
+    public <T, U, S_OUT extends BaseStream<U>> Node<U> exerciseOps(Collection<T> data, Mapper<S_OUT, Stream<T>> m) {
+        return withData(new StreamTestData.CollectionData<>("Collection of type " + data.getClass().getName(), data)).
+                using(m).
+                exercise();
+    }
+
+    public <T, U, S_OUT extends BaseStream<U>, I extends Iterable<U> & Sized> Node<U> exerciseOps(Collection<T> data, Mapper<S_OUT, Stream<T>> m, I expected) {
+        return withData(new StreamTestData.CollectionData<>("Collection of type " + data.getClass().getName(), data)).
+                using(m).
+                expectedResult(expected).
+                exercise();
+    }
+
+    public <T, S_IN extends BaseStream<T>> IntermediateOpDataBuilder<T, S_IN> withData(TestData<T, S_IN> data) {
         Objects.requireNonNull(data);
         return new IntermediateOpDataBuilder<>(data);
     }
 
     @SuppressWarnings("rawtypes")
-    public class IntermediateOpDataBuilder<T> {
-        final TestData<T> data;
+    public class IntermediateOpDataBuilder<T, S_IN extends BaseStream<T>> {
+        final TestData<T, S_IN> data;
 
-
-        private IntermediateOpDataBuilder(TestData<T> data) {
+        private IntermediateOpDataBuilder(TestData<T, S_IN> data) {
             this.data = Objects.requireNonNull(data);
         }
 
-        public <U> IntermediateOpTestBuilder<T, U> using(IntermediateOp... ops) {
-            return new IntermediateOpTestBuilder<>(data, () -> ops);
+        public <U, S_OUT extends BaseStream<U>> IntermediateOpTestBuilder<T, U, S_IN, S_OUT> using(IntermediateOp... ops) {
+            return usingOpsFactory(() -> ops);
         }
 
-        public <U> IntermediateOpTestBuilder<T, U> usingOpFactory(Factory<IntermediateOp> fop) {
-            return new IntermediateOpTestBuilder<>(data, () -> new IntermediateOp[] { fop.make() });
+        public <U, S_OUT extends BaseStream<U>> IntermediateOpTestBuilder<T, U, S_IN, S_OUT> usingOpFactory(Factory<IntermediateOp> fop) {
+            return usingOpsFactory(() -> new IntermediateOp[] { fop.make() });
         }
 
-        public <U> IntermediateOpTestBuilder<T, U> usingOpsFactory(Factory<IntermediateOp[]> fops) {
-            return new IntermediateOpTestBuilder<>(data, fops);
+        @SuppressWarnings("unchecked")
+        public <U, S_OUT extends BaseStream<U>> IntermediateOpTestBuilder<T, U, S_IN, S_OUT> usingOpsFactory(Factory<IntermediateOp[]> fops) {
+            return using((S_IN s) -> (S_OUT) chain((AbstractPipeline<?, T>)s, fops.make()));
         }
+
+        public <U, S_OUT extends BaseStream<U>> IntermediateOpTestBuilder<T, U, S_IN, S_OUT> using(Mapper<S_OUT, S_IN> m) {
+            return new IntermediateOpTestBuilder<>(data, m);
+        }
+
     }
 
     @SuppressWarnings("rawtypes")
-    public class IntermediateOpTestBuilder<T, U> {
-        final TestData<T> data;
+    public class IntermediateOpTestBuilder<T, U, S_IN extends BaseStream<T>, S_OUT extends BaseStream<U>> {
+        final TestData<T, S_IN> data;
 
-        final Factory<IntermediateOp[]> fops;
+        final Mapper<S_OUT, S_IN> m;
 
         final StreamShape shape;
 
         Set<IntermediateOpTestScenario> testSet = new HashSet<>();
 
-        @SuppressWarnings("unchecked")
-        Block<TestData<T>> before = LambdaTestHelpers.bEmpty;
+        Node<U> refResult;
 
         @SuppressWarnings("unchecked")
-        Block<TestData<T>> after = LambdaTestHelpers.bEmpty;
+        Block<TestData<T, S_IN>> before = LambdaTestHelpers.bEmpty;
+
+        @SuppressWarnings("unchecked")
+        Block<TestData<T, S_IN>> after = LambdaTestHelpers.bEmpty;
 
         BiPredicate<Iterable<U>, Iterable<U>> sequentialEqualator = Objects::equals;
 
         BiPredicate<Iterable<U>, Iterable<U>> parallelEqualator = Objects::equals;
 
-        private IntermediateOpTestBuilder(TestData<T> data, Factory<IntermediateOp[]> fops) {
+        private IntermediateOpTestBuilder(TestData<T, S_IN> data, Mapper<S_OUT, S_IN> m) {
             this.data = data;
 
-            this.fops = Objects.requireNonNull(fops);
+            this.m = Objects.requireNonNull(m);
 
-            IntermediateOp[] ops = fops.make();
-            this.shape = (ops.length > 0) ? ops[ops.length - 1].outputShape() : data.getShape();
+            this.shape = ((AbstractPipeline<?, U>) m.map(data.stream())).getOutputShape();
 
-            // Have to initiate from the output shape of the last operation or the shape of the test data
-            // if there are not operations
-            // This means the operations are required first rather than last
+            // Have to initiate from the output shape of the last stream
+            // This means the stream mapper is required first rather than last
             testSet.addAll(testScenarios.get(shape.getStreamType()));
         }
 
@@ -144,22 +167,32 @@
 
         //
 
-        public IntermediateOpTestBuilder<T, U> before(Block<TestData<T>> before) {
+        @SuppressWarnings("unchecked")
+        public <I extends Iterable<U> & Sized> IntermediateOpTestBuilder<T, U, S_IN, S_OUT> expectedResult(I expectedResult) {
+            NodeBuilder<U> resultBuilder = shape.makeNodeBuilder(expectedResult.size());
+            resultBuilder.begin(expectedResult.size());
+            expectedResult.forEach(resultBuilder);
+            resultBuilder.end();
+            this.refResult = resultBuilder.build();
+            return this;
+        }
+
+        public IntermediateOpTestBuilder<T, U, S_IN, S_OUT> before(Block<TestData<T, S_IN>> before) {
             this.before = Objects.requireNonNull(before);
             return this;
         }
 
-        public IntermediateOpTestBuilder<T, U> after(Block<TestData<T>> after) {
+        public IntermediateOpTestBuilder<T, U, S_IN, S_OUT> after(Block<TestData<T, S_IN>> after) {
             this.after = Objects.requireNonNull(after);
             return this;
         }
 
         @SuppressWarnings("unchecked")
-        public IntermediateOpTestBuilder<T, U> without(IntermediateOpTestScenario... tests) {
+        public IntermediateOpTestBuilder<T, U, S_IN, S_OUT> without(IntermediateOpTestScenario... tests) {
             return without(Arrays.asList(tests));
         }
 
-        public IntermediateOpTestBuilder<T, U> without(Collection<? extends IntermediateOpTestScenario> tests) {
+        public IntermediateOpTestBuilder<T, U, S_IN, S_OUT> without(Collection<? extends IntermediateOpTestScenario> tests) {
             for (IntermediateOpTestScenario ts : tests) {
                 if (ts.getShape().getStreamType() == shape.getStreamType()) {
                     testSet.remove(ts);
@@ -174,11 +207,11 @@
         }
 
         @SuppressWarnings("unchecked")
-        public IntermediateOpTestBuilder<T, U> with(IntermediateOpTestScenario... tests) {
+        public IntermediateOpTestBuilder<T, U, S_IN, S_OUT> with(IntermediateOpTestScenario... tests) {
             return with(Arrays.asList(tests));
         }
 
-        public IntermediateOpTestBuilder<T, U> with(Collection<? extends IntermediateOpTestScenario> tests) {
+        public IntermediateOpTestBuilder<T, U, S_IN, S_OUT> with(Collection<? extends IntermediateOpTestScenario> tests) {
             testSet = new HashSet<>();
 
             for (IntermediateOpTestScenario ts : tests) {
@@ -194,12 +227,12 @@
             return this;
         }
 
-        public IntermediateOpTestBuilder<T, U> sequentialEqualator(BiPredicate<Iterable<U>, Iterable<U>> equalator) {
+        public IntermediateOpTestBuilder<T, U, S_IN, S_OUT> sequentialEqualator(BiPredicate<Iterable<U>, Iterable<U>> equalator) {
             this.sequentialEqualator = equalator;
             return this;
         }
 
-        public IntermediateOpTestBuilder<T, U> parallelEqualator(BiPredicate<Iterable<U>, Iterable<U>> equalator) {
+        public IntermediateOpTestBuilder<T, U, S_IN, S_OUT> parallelEqualator(BiPredicate<Iterable<U>, Iterable<U>> equalator) {
             this.parallelEqualator = equalator;
             return this;
         }
@@ -212,28 +245,31 @@
     }
 
     @SuppressWarnings({"rawtypes", "unchecked"})
-    protected static <T, U> Node<U> exerciseOps(IntermediateOpTestBuilder<T, U> b) {
-        b.before.apply(b.data);
-        // Get the reference result by sequentially collecting the output as a Node
-        Node<U> refResult = (Node<U>) b.data.seq(b.fops.make()).collectOutput().flatten();
-        b.after.apply(b.data);
+    protected static <T, U, S_IN extends BaseStream<T>, S_OUT extends BaseStream<U>> Node<U> exerciseOps(IntermediateOpTestBuilder<T, U, S_IN, S_OUT> b) {
+        Node<U> refResult;
+        if (b.refResult != null) {
+            refResult = b.refResult;
+        } else {
+            // Induce the reference result
+            b.before.apply(b.data);
+            S_OUT sOut = b.m.map(b.data.stream());
+            refResult = ((AbstractPipeline<?, U>) sOut).collectOutput().flatten();
+            b.after.apply(b.data);
+        }
 
         for (IntermediateOpTestScenario test : b.testSet) {
-            IntermediateOp[] ops = b.fops.make();
-            if (test.isApplicable(ops)) {
-                b.before.apply(b.data);
+            b.before.apply(b.data);
 
-                NodeBuilder<U> resultBuilder = b.shape.makeNodeBuilder(-1);
-                resultBuilder.begin(-1);
-                test.run(b.data, resultBuilder, ops);
-                resultBuilder.end();
-                Node<U> result = resultBuilder.build();
+            NodeBuilder<U> resultBuilder = b.shape.makeNodeBuilder(-1);
+            resultBuilder.begin(-1);
+            test.run(b.data, resultBuilder, b.m);
+            resultBuilder.end();
+            Node<U> result = resultBuilder.build();
 
-                assertTrue(b.getEqualator(test).test(result, refResult),
-                           String.format("%s %s %s: %s != %s", b.data.toString(), test, Arrays.toString(ops), refResult, result));
+            assertTrue(b.getEqualator(test).test(result, refResult),
+                       String.format("%s %s: %s != %s", b.data.toString(), test, refResult, result));
 
-                b.after.apply(b.data);
-            }
+            b.after.apply(b.data);
         }
 
         return refResult;
@@ -242,17 +278,17 @@
     // Exercise terminal operations
 
     @SuppressWarnings("rawtypes")
-    protected <T, U> U exerciseOps(TestData<T> data, TerminalOp<T, U> terminal) {
+    protected <T, U, S_IN extends BaseStream<T>> U exerciseOps(TestData<T, S_IN> data, TerminalOp<T, U> terminal) {
         return exerciseOps(data, terminal, new IntermediateOp[0]);
     }
 
     @SuppressWarnings("rawtypes")
-    protected <T, U> U exerciseOps(TestData<T> data, TerminalOp<T, U> terminal, IntermediateOp... ops) {
+    protected <T, U, S_IN extends BaseStream<T>> U exerciseOps(TestData<T, S_IN> data, TerminalOp<T, U> terminal, IntermediateOp... ops) {
         return exerciseOps(data, (u, v) -> Objects.equals(u,v), terminal, ops);
     }
 
     @SuppressWarnings({ "rawtypes", "unchecked" })
-    protected static <T, U> U exerciseOps(TestData<T> data,
+    protected static <T, U, S_IN extends BaseStream<T>> U exerciseOps(TestData<T, S_IN> data,
                                           BiPredicate<U, U> equalator,
                                           TerminalOp<T, U> terminalOp,
                                           IntermediateOp... ops) {
@@ -344,7 +380,7 @@
     // Test data
 
     @SuppressWarnings({"rawtypes", "unchecked"})
-    public static interface TestData<T> extends Iterable<T>, Sized {
+    public static interface TestData<T, S extends BaseStream<T>> extends Iterable<T>, Streamable<S>, Sized {
 
         // @@@ This is not used, should it be removed?
         Spliterator<T> spliterator();
@@ -360,11 +396,25 @@
             return target;
         }
 
+        // Streamable
+
+        @Override
+        S stream();
+
+        @Override
+        S parallel();
+
         // Source
 
-        AbstractPipeline<?, T> seq();
+        @SuppressWarnings({ "rawtypes", "unchecked" })
+        default AbstractPipeline<?, T> seq() {
+            return (AbstractPipeline<?, T>) stream();
+        }
 
-        AbstractPipeline<?, T> par();
+        @SuppressWarnings({ "rawtypes", "unchecked" })
+        default AbstractPipeline<?, T> par() {
+            return (AbstractPipeline<?, T>) stream();
+        }
 
         // Intermediate
 
@@ -394,11 +444,11 @@
             return chain(par(), op);
         }
 
-        default <U> U seq(TerminalOp<T, U> terminal, IntermediateOp... ops) {
+        default <U> U seq(TerminalOp<?, U> terminal, IntermediateOp... ops) {
             return chain(seq(), terminal, ops);
         }
 
-        default <U> U par(TerminalOp<T, U> terminal, IntermediateOp... ops) {
+        default <U> U par(TerminalOp<?, U> terminal, IntermediateOp... ops) {
             return chain(par(), terminal, ops);
         }
     }
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/StreamIntermediateOpTestScenario.java	Thu Nov 15 17:35:31 2012 +0100
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/StreamIntermediateOpTestScenario.java	Thu Nov 15 17:55:02 2012 +0100
@@ -27,6 +27,7 @@
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.functions.Block;
+import java.util.functions.Mapper;
 import java.util.streams.*;
 import java.util.streams.ops.FlagDeclaringOp;
 import java.util.streams.ops.IntermediateOp;
@@ -35,15 +36,15 @@
 public enum StreamIntermediateOpTestScenario implements OpTestCase.IntermediateOpTestScenario {
 
     STREAM_FOR_EACH(false) {
-        public <T> void run(OpTestCase.TestData<T> data, Block b, IntermediateOp[] ops) {
-            stream(data.seq(ops)).forEach(b);
+        <T, U, S_IN extends BaseStream<T>> void _run(OpTestCase.TestData<T, S_IN> data, Block<U> b, Mapper<Stream<U>, S_IN> m) {
+            m.map(data.stream()).forEach(b);
         }
     },
 
     // Wrap as stream and into a list
     STREAM_INTO(false) {
-        public <T> void run(OpTestCase.TestData<T> data, Block b, IntermediateOp[] ops) {
-            for (Object t : stream(data.seq(ops)).into(new ArrayList())) {
+        <T, U, S_IN extends BaseStream<T>> void _run(OpTestCase.TestData<T, S_IN> data, Block<U> b, Mapper<Stream<U>, S_IN> m) {
+            for (U t : m.map(data.stream()).into(new ArrayList<U>())) {
                 b.apply(t);
             }
         }
@@ -51,26 +52,26 @@
 
     // Wrap as stream and into a list
     STREAM_TO_ARRAY(false) {
-        public <T> void run(OpTestCase.TestData<T> data, Block b, IntermediateOp[] ops) {
-            for (Object t : stream(data.seq(ops)).toArray()) {
-                b.apply(t);
+        <T, U, S_IN extends BaseStream<T>> void _run(OpTestCase.TestData<T, S_IN> data, Block<U> b, Mapper<Stream<U>, S_IN> m) {
+            for (Object t : m.map(data.stream()).toArray()) {
+                b.apply((U) t);
             }
         }
     },
 
     // Wrap as stream, and iterate in pull mode
     STREAM_ITERATOR(false) {
-        public <T> void run(OpTestCase.TestData<T> data, Block b, IntermediateOp[] ops) {
-            for (Iterator<?> seqIter = data.seq(ops).iterator(); seqIter.hasNext(); )
+        <T, U, S_IN extends BaseStream<T>> void _run(OpTestCase.TestData<T, S_IN> data, Block<U> b, Mapper<Stream<U>, S_IN> m) {
+            for (Iterator<U> seqIter = m.map(data.stream()).iterator(); seqIter.hasNext(); )
                 b.apply(seqIter.next());
         }
     },
 
     // Wrap as stream, and iterate in mixed mode
     STREAM_MIXED(false) {
-        public <T> void run(OpTestCase.TestData<T> data, Block b, IntermediateOp[] ops) {
-            Stream<?> stream = stream(data.seq(ops));
-            Iterator<?> iter = stream.iterator();
+        <T, U, S_IN extends BaseStream<T>> void _run(OpTestCase.TestData<T, S_IN> data, Block<U> b, Mapper<Stream<U>, S_IN> m) {
+            Stream<U> stream = m.map(data.stream());
+            Iterator<U> iter = stream.iterator();
             if (iter.hasNext())
                 b.apply(iter.next());
             stream.forEach(b);
@@ -79,79 +80,89 @@
 
     // Wrap as two connected streams, request iterator for the upstream, and do forEach on the downstream
     STREAM_MIXED_ITERATOR_FOR_EACH(false) {
-        public <T> void run(OpTestCase.TestData<T> data, Block b, IntermediateOp[] ops) {
-            AbstractPipeline<?, ?> pipe1 = data.seq(new FlagDeclaringOp(0));
-            AbstractPipeline<?, ?> pipe2 = OpTestCase.chain(pipe1, ops);
+        <T, U, S_IN extends BaseStream<T>> void _run(OpTestCase.TestData<T, S_IN> data, Block<U> b, Mapper<Stream<U>, S_IN> m) {
+            AbstractPipeline<T, T> pipe1 = data.seq(new NoOp(data.getShape()));
+            Stream<U> pipe2 = m.map((S_IN) pipe1);
 
             pipe1.iterator();
-            stream(pipe2).forEach(b);
+            pipe2.forEach(b);
         }
     },
 
     // Wrap as parallel stream + sequential
     PAR_STREAM_SEQUENTIAL_FOR_EACH(true) {
-        public <T> void run(OpTestCase.TestData<T> data, Block b, IntermediateOp[] ops) {
-            stream(data.par(ops)).sequential().forEach(b);
+        <T, U, S_IN extends BaseStream<T>> void _run(OpTestCase.TestData<T, S_IN> data, Block<U> b, Mapper<Stream<U>, S_IN> m) {
+            m.map(data.parallel()).sequential().forEach(b);
         }
     },
 
     // Wrap as parallel stream + toArray
     PAR_STREAM_TO_ARRAY(true) {
-        public <T> void run(OpTestCase.TestData<T> data, Block b, IntermediateOp[] ops) {
-            for (Object t : stream(data.par(ops)).toArray())
-                b.apply(t);
+        <T, U, S_IN extends BaseStream<T>> void _run(OpTestCase.TestData<T, S_IN> data, Block<U> b, Mapper<Stream<U>, S_IN> m) {
+            for (Object t : m.map(data.parallel()).toArray())
+                b.apply((U) t);
         }
     },
 
     // Wrap as parallel stream + toArray and clear SIZED flag
     PAR_STREAM_TO_ARRAY_CLEAR_SIZED(true) {
-        public <T> void run(OpTestCase.TestData<T> data, Block b, IntermediateOp[] ops) {
-            AbstractPipeline<?, ?> pipe1 = data.seq(new FlagDeclaringOp(StreamOpFlags.NOT_SIZED));
-            AbstractPipeline<?, ?> pipe2 = OpTestCase.chain(pipe1, ops);
+        <T, U, S_IN extends BaseStream<T>> void _run(OpTestCase.TestData<T, S_IN> data, Block<U> b, Mapper<Stream<U>, S_IN> m) {
+            AbstractPipeline<?, ?> pipe1 = data.par(new FlagDeclaringOp(StreamOpFlags.NOT_SIZED) {
+                @Override
+                public StreamShape outputShape() {
+                    return data.getShape();
+                }
 
-            for (Object t : stream(pipe2).toArray())
-                b.apply(t);
+                @Override
+                public StreamShape inputShape() {
+                    return data.getShape();
+                }
+            });
+            Stream<U> pipe2 = m.map((S_IN) pipe1);
+
+            for (Object t : pipe2.toArray())
+                b.apply((U) t);
         }
     },
 
     // Wrap as parallel stream + into
     PAR_STREAM_SEQUENTIAL_INTO(true) {
-        public <T> void run(OpTestCase.TestData<T> data, Block b, IntermediateOp[] ops) {
-            ArrayList list = stream(data.par(ops)).sequential().into(new ArrayList());
-            for (Object u : list)
+        <T, U, S_IN extends BaseStream<T>> void _run(OpTestCase.TestData<T, S_IN> data, Block<U> b, Mapper<Stream<U>, S_IN> m) {
+            ArrayList<U> list = m.map(data.parallel()).sequential().into(new ArrayList<U>());
+            for (U u : list)
                 b.apply(u);
         }
     },
 
     // Wrap as parallel stream, and iterate in mixed mode
     PAR_STREAM_ITERATOR_TO_ARRAY_MIXED(true) {
-        public <T> void run(OpTestCase.TestData<T> data, Block b, IntermediateOp[] ops) {
-            Stream<?> stream = stream(data.par(ops));
-            Iterator<?> iter = stream.iterator();
+        <T, U, S_IN extends BaseStream<T>> void _run(OpTestCase.TestData<T, S_IN> data, Block<U> b, Mapper<Stream<U>, S_IN> m) {
+            Stream<U> stream = m.map(data.parallel());
+            Iterator<U> iter = stream.iterator();
             if (iter.hasNext())
                 b.apply(iter.next());
             for (Object t : stream.toArray())
-                b.apply(t);
+                b.apply((U) t);
         }
     },
 
     // Wrap as two connected streams, request iterator for the upstream, and do forEach on the downstream
     PAR_STREAM_MIXED_ITERATOR_TO_ARRAY(true) {
-        public <T> void run(OpTestCase.TestData<T> data, Block b, IntermediateOp[] ops) {
-            AbstractPipeline<?, ?> pipe1 = data.par(new FlagDeclaringOp(0));
-            AbstractPipeline<?, ?> pipe2 = OpTestCase.chain(pipe1, ops);
+        <T, U, S_IN extends BaseStream<T>> void _run(OpTestCase.TestData<T, S_IN> data, Block<U> b, Mapper<Stream<U>, S_IN> m) {
+            AbstractPipeline<T, T> pipe1 = data.seq(new NoOp(data.getShape()));
+            Stream<U> pipe2 = m.map((S_IN) pipe1);
 
             pipe1.iterator();
-            for (Object t : stream(pipe2).toArray())
-                b.apply(t);
+            for (Object t : pipe2.toArray())
+                b.apply((U) t);
         }
     },
 
     // Wrap as parallel stream, and iterate in mixed mode
     PAR_STREAM_SEQUENTIAL_MIXED(true) {
-        public <T> void run(OpTestCase.TestData<T> data, Block b, IntermediateOp[] ops) {
-            Stream<?> stream = stream(data.par(ops));
-            Iterator<?> iter = stream.iterator();
+        <T, U, S_IN extends BaseStream<T>> void _run(OpTestCase.TestData<T, S_IN> data, Block<U> b, Mapper<Stream<U>, S_IN> m) {
+            Stream<U> stream = m.map(data.parallel());
+            Iterator<U> iter = stream.iterator();
             if (iter.hasNext())
                 b.apply(iter.next());
             stream.sequential().forEach(b);
@@ -162,6 +173,35 @@
     // Extends testing to test whether computation happens in- or out-of-thread
     ;
 
+    private static class NoOp<T> implements IntermediateOp<T, T> {
+
+        private final StreamShape shape;
+
+        private NoOp(StreamShape shape) {
+            this.shape = shape;
+        }
+
+        @Override
+        public StreamShape inputShape() {
+            return shape;
+        }
+
+        @Override
+        public StreamShape outputShape() {
+            return shape;
+        }
+
+        @Override
+        public Iterator<T> wrapIterator(int flags, Iterator<T> in) {
+            return in;
+        }
+
+        @Override
+        public Sink<T> wrapSink(int flags, Sink<T> sink) {
+            return sink;
+        }
+    }
+
     private boolean isParallel;
 
     StreamIntermediateOpTestScenario(boolean isParallel) {
@@ -176,7 +216,9 @@
         return isParallel;
     }
 
-    <T> Stream<T> stream(AbstractPipeline<?, T> ap) {
-        return (Stream<T>)ap;
+    public <T, U, S_IN extends BaseStream<T>, S_OUT extends BaseStream<U>> void run(OpTestCase.TestData<T, S_IN> data, Block<U> b, Mapper<S_OUT, S_IN> m) {
+        _run(data, b, (Mapper<Stream<U>, S_IN>) m);
     }
+
+    abstract <T, U, S_IN extends BaseStream<T>> void _run(OpTestCase.TestData<T, S_IN> data, Block<U> b, Mapper<Stream<U>, S_IN> m);
 }
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/StreamTestData.java	Thu Nov 15 17:35:31 2012 +0100
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/StreamTestData.java	Thu Nov 15 17:55:02 2012 +0100
@@ -28,7 +28,7 @@
 import java.util.Iterator;
 import java.util.streams.*;
 
-public abstract class StreamTestData<T> implements OpTestCase.TestData<T> {
+public abstract class StreamTestData<T> implements OpTestCase.TestData<T, Stream<T>> {
 
     @Override
     @SuppressWarnings("rawtypes")
@@ -36,16 +36,6 @@
         return StreamShapeFactory.REFERENCE;
     }
 
-    @SuppressWarnings("unchecked")
-    public Stream<T> seqStream() {
-        return (Stream<T>) seq();
-    }
-
-    @SuppressWarnings("unchecked")
-    public Stream<T> parStream() {
-        return (Stream<T>) par();
-    }
-
     public static class ArrayData<T> extends StreamTestData<T> {
         private final String name;
         private final T[] array;
@@ -56,15 +46,13 @@
         }
 
         @Override
-        @SuppressWarnings({ "rawtypes", "unchecked" })
-        public AbstractPipeline<?, T> seq() {
-            return (AbstractPipeline<?, T>) Arrays.stream(array);
+        public Stream<T> stream() {
+            return Arrays.stream(array);
         }
 
         @Override
-        @SuppressWarnings({ "rawtypes", "unchecked" })
-        public AbstractPipeline<?, T> par() {
-            return (AbstractPipeline<?, T>) Arrays.parallel(array);
+        public Stream<T> parallel() {
+            return Arrays.parallel(array);
         }
 
         @Override
@@ -98,15 +86,13 @@
         }
 
         @Override
-        @SuppressWarnings({ "rawtypes", "unchecked" })
-        public AbstractPipeline<?, T> seq() {
-            return (AbstractPipeline<?, T>) collection.stream();
+        public Stream<T> stream() {
+            return collection.stream();
         }
 
         @Override
-        @SuppressWarnings({ "rawtypes", "unchecked" })
-        public AbstractPipeline<?, T> par() {
-            return (AbstractPipeline<?, T>) collection.parallel();
+        public Stream<T> parallel() {
+            return collection.parallel();
         }
 
         @Override
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/ConcatOpTest.java	Thu Nov 15 17:35:31 2012 +0100
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/ConcatOpTest.java	Thu Nov 15 17:55:02 2012 +0100
@@ -107,14 +107,14 @@
     @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
     public void testOpsSequential(String name, StreamTestData<Integer> data) {
         withData(data).
-                usingOpFactory(() -> new ConcatOp<>(data.seqStream())).
+                usingOpFactory(() -> new ConcatOp<>(data.stream())).
                 exercise();
     }
 
     @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
     public void testOpsParallel(String name, StreamTestData<Integer> data) {
         withData(data).
-                usingOpFactory(() -> new ConcatOp<>(data.parStream())).
+                usingOpFactory(() -> new ConcatOp<>(data.parallel())).
                 exercise();
     }
 }
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/FilterOpTest.java	Thu Nov 15 17:35:31 2012 +0100
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/FilterOpTest.java	Thu Nov 15 17:55:02 2012 +0100
@@ -60,11 +60,11 @@
         assertCountSum(countTo(10).stream().filter(pTrue), 10, 55);
         assertCountSum(countTo(10).stream().filter(pEven).filter(pOdd), 0, 0);
 
-        assertStreamContents(countTo(1000), s -> s.filter(pTrue), countTo(1000));
-        assertStreamContents(countTo(1000), s -> s.filter(pFalse), countTo(0));
-        assertStreamContents(countTo(1000), s -> s.filter(e -> e > 100), range(101, 1000));
-        assertStreamContents(countTo(1000), s -> s.filter(e -> e < 100), countTo(99));
-        assertStreamContents(countTo(1000), s -> s.filter(e -> e == 100), Arrays.asList(100));
+        exerciseOps(countTo(1000), s -> s.filter(pTrue), countTo(1000));
+        exerciseOps(countTo(1000), s -> s.filter(pFalse), countTo(0));
+        exerciseOps(countTo(1000), s -> s.filter(e -> e > 100), range(101, 1000));
+        exerciseOps(countTo(1000), s -> s.filter(e -> e < 100), countTo(99));
+        exerciseOps(countTo(1000), s -> s.filter(e -> e == 100), Arrays.asList(100));
     }
 
     @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/FindFirstOpTest.java	Thu Nov 15 17:35:31 2012 +0100
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/FindFirstOpTest.java	Thu Nov 15 17:55:02 2012 +0100
@@ -54,12 +54,12 @@
         assertFalse(Collections.<Integer>emptySet().stream().findFirst().isPresent(), "no result");
         assertFalse(countTo(10).stream().filter(x -> x > 10).findFirst().isPresent(), "no result");
 
-        assertStreamContents(countTo(1000), s-> Arrays.asList(new Integer[] { s.filter(pEven).findFirst().get() }).stream(), Arrays.asList(2));
-        assertStreamContents(countTo(1000), s-> Arrays.asList(new Integer[] { s.findFirst().get() }).stream(), Arrays.asList(1));
-        assertStreamContents(countTo(1000), s-> Arrays.asList(new Integer[] { s.filter(e -> e == 499).findFirst().get() }).stream(), Arrays.asList(499));
-        assertStreamContents(countTo(1000), s-> Arrays.asList(new Integer[] { s.filter(e -> e == 999).findFirst().get() }).stream(), Arrays.asList(999));
-        assertStreamContents(countTo(0), s-> Arrays.asList(new Integer[] { s.findFirst().orElse(-1) }).stream(), Arrays.asList(-1));
-        assertStreamContents(countTo(1000), s-> Arrays.asList(new Integer[] { s.filter(e -> e == 1499).findFirst().orElse(-1) }).stream(), Arrays.asList(-1));
+        exerciseOps(countTo(1000), s-> Arrays.asList(new Integer[] { s.filter(pEven).findFirst().get() }).stream(), Arrays.asList(2));
+        exerciseOps(countTo(1000), s-> Arrays.asList(new Integer[] { s.findFirst().get() }).stream(), Arrays.asList(1));
+        exerciseOps(countTo(1000), s-> Arrays.asList(new Integer[] { s.filter(e -> e == 499).findFirst().get() }).stream(), Arrays.asList(499));
+        exerciseOps(countTo(1000), s-> Arrays.asList(new Integer[] { s.filter(e -> e == 999).findFirst().get() }).stream(), Arrays.asList(999));
+        exerciseOps(countTo(0), s-> Arrays.asList(new Integer[] { s.findFirst().orElse(-1) }).stream(), Arrays.asList(-1));
+        exerciseOps(countTo(1000), s-> Arrays.asList(new Integer[] { s.filter(e -> e == 1499).findFirst().orElse(-1) }).stream(), Arrays.asList(-1));
     }
 
     @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
@@ -73,20 +73,20 @@
     @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
     public void testPipelines(String name, StreamTestData<Integer> data) {
         Optional<Integer> seq, par;
-        seq = data.seqStream().findFirst();
-        par = data.parStream().findFirst();
+        seq = data.stream().findFirst();
+        par = data.parallel().findFirst();
         assertEquals(par, seq);
 
-        seq = data.seqStream().filter(pEven).findFirst();
-        par = data.parStream().filter(pEven).findFirst();
+        seq = data.stream().filter(pEven).findFirst();
+        par = data.parallel().filter(pEven).findFirst();
         assertEquals(par, seq);
 
-        seq = data.seqStream().filter(pTrue).findFirst();
-        par = data.parStream().filter(pTrue).findFirst();
+        seq = data.stream().filter(pTrue).findFirst();
+        par = data.parallel().filter(pTrue).findFirst();
         assertEquals(par, seq);
 
-        seq = data.seqStream().filter(pFalse).findFirst();
-        par = data.parStream().filter(pFalse).findFirst();
+        seq = data.stream().filter(pFalse).findFirst();
+        par = data.parallel().filter(pFalse).findFirst();
         assertEquals(par, seq);
     }
 }
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/MapOpTest.java	Thu Nov 15 17:35:31 2012 +0100
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/MapOpTest.java	Thu Nov 15 17:55:02 2012 +0100
@@ -61,9 +61,9 @@
         assertCountSum(countTo(10).stream().map(mDoubler).map(mDoubler), 10, 220);
 
 
-        assertStreamContents(countTo(0), s -> s.map(e -> e), countTo(0));
-        assertStreamContents(countTo(1000), s -> s.map(e -> e), countTo(1000));
-        assertStreamContents(countTo(1000), s -> s.map(e -> (Integer) (1000+e)), range(1001, 2000));
+        exerciseOps(countTo(0), s -> s.map(e -> e), countTo(0));
+        exerciseOps(countTo(1000), s -> s.map(e -> e), countTo(1000));
+        exerciseOps(countTo(1000), s -> s.map(e -> (1000+e)), range(1001, 2000));
     }
 
     @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/ReduceByOpTest.java	Thu Nov 15 17:35:31 2012 +0100
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/ReduceByOpTest.java	Thu Nov 15 17:55:02 2012 +0100
@@ -48,12 +48,12 @@
 
     @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
     public void testOps(String name, StreamTestData<Integer> data) {
-        Map<Boolean,Collection<Integer>> gbResult = data.seqStream().groupBy(Mappers.forPredicate(pEven, true, false));
-        Map<Boolean,Integer> result = data.seqStream().reduceBy(Mappers.forPredicate(pEven, true, false),
+        Map<Boolean,Collection<Integer>> gbResult = data.stream().groupBy(Mappers.forPredicate(pEven, true, false));
+        Map<Boolean,Integer> result = data.stream().reduceBy(Mappers.forPredicate(pEven, true, false),
                                                                 () -> 0, rPlus);
         assertEquals(result.size(), gbResult.size());
         for (Map.Entry<Boolean, Integer> entry : result.entrySet())
-            assertEquals(entry.getValue(), data.seqStream().filter(e -> pEven.test(e) == entry.getKey()).reduce(0, rPlus));
+            assertEquals(entry.getValue(), data.stream().filter(e -> pEven.test(e) == entry.getKey()).reduce(0, rPlus));
 
         int uniqueSize = data.into(new HashSet<Integer>()).size();
         Map<Integer, Collection<Integer>> mgResult = exerciseOps(data, new GroupByOp<>(mId));
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/SliceOpTest.java	Thu Nov 15 17:35:31 2012 +0100
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/SliceOpTest.java	Thu Nov 15 17:55:02 2012 +0100
@@ -71,15 +71,15 @@
         assertCountSum(countTo(4).parallel().skip(2), 2, 7);
         assertCountSum(countTo(4).parallel().skip(0), 4, 10);
 
-        assertStreamContents(Collections.emptyList(), s -> s.skip(0), Collections.emptyList());
-        assertStreamContents(Collections.emptyList(), s -> s.skip(10), Collections.emptyList());
+        exerciseOps(Collections.emptyList(), s -> s.skip(0), Collections.emptyList());
+        exerciseOps(Collections.emptyList(), s -> s.skip(10), Collections.emptyList());
 
-        assertStreamContents(countTo(1), s -> s.skip(0), countTo(1));
-        assertStreamContents(countTo(1), s -> s.skip(1), Collections.emptyList());
-        assertStreamContents(countTo(100), s -> s.skip(0), countTo(100));
-        assertStreamContents(countTo(100), s -> s.skip(10), range(11, 100));
-        assertStreamContents(countTo(100), s -> s.skip(100), Collections.emptyList());
-        assertStreamContents(countTo(100), s -> s.skip(200), Collections.emptyList());
+        exerciseOps(countTo(1), s -> s.skip(0), countTo(1));
+        exerciseOps(countTo(1), s -> s.skip(1), Collections.<Integer>emptyList());
+        exerciseOps(countTo(100), s -> s.skip(0), countTo(100));
+        exerciseOps(countTo(100), s -> s.skip(10), range(11, 100));
+        exerciseOps(countTo(100), s -> s.skip(100), Collections.<Integer>emptyList());
+        exerciseOps(countTo(100), s -> s.skip(200), Collections.<Integer>emptyList());
     }
 
     public void testLimit() {
@@ -93,56 +93,56 @@
         assertCountSum(countTo(4).parallel().limit(4), 4, 10);
         assertCountSum(countTo(8).parallel().limit(4), 4, 10);
 
-        assertStreamContents(Collections.emptyList(), s -> s.limit(0), Collections.emptyList());
-        assertStreamContents(Collections.emptyList(), s -> s.limit(10), Collections.emptyList());
-        assertStreamContents(countTo(1), s -> s.limit(0), Collections.emptyList());
-        assertStreamContents(countTo(1), s -> s.limit(1), countTo(1));
-        assertStreamContents(countTo(100), s -> s.limit(0), Collections.emptyList());
-        assertStreamContents(countTo(100), s -> s.limit(10), countTo(10));
-        assertStreamContents(countTo(100), s -> s.limit(10).limit(10), countTo(10));
-        assertStreamContents(countTo(100), s -> s.limit(100), countTo(100));
-        assertStreamContents(countTo(100), s -> s.limit(100).limit(10), countTo(10));
-        assertStreamContents(countTo(100), s -> s.limit(200), countTo(100));
+        exerciseOps(Collections.emptyList(), s -> s.limit(0), Collections.emptyList());
+        exerciseOps(Collections.emptyList(), s -> s.limit(10), Collections.emptyList());
+        exerciseOps(countTo(1), s -> s.limit(0), Collections.<Integer>emptyList());
+        exerciseOps(countTo(1), s -> s.limit(1), countTo(1));
+        exerciseOps(countTo(100), s -> s.limit(0), Collections.<Integer>emptyList());
+        exerciseOps(countTo(100), s -> s.limit(10), countTo(10));
+        exerciseOps(countTo(100), s -> s.limit(10).limit(10), countTo(10));
+        exerciseOps(countTo(100), s -> s.limit(100), countTo(100));
+        exerciseOps(countTo(100), s -> s.limit(100).limit(10), countTo(10));
+        exerciseOps(countTo(100), s -> s.limit(200), countTo(100));
     }
 
     public void testSkipLimit() {
-        assertStreamContents(Collections.emptyList(), s -> s.skip(0).limit(0), Collections.emptyList());
-        assertStreamContents(Collections.emptyList(), s -> s.skip(0).limit(10), Collections.emptyList());
-        assertStreamContents(Collections.emptyList(), s -> s.skip(10).limit(0), Collections.emptyList());
-        assertStreamContents(Collections.emptyList(), s -> s.skip(10).limit(10), Collections.emptyList());
+        exerciseOps(Collections.emptyList(), s -> s.skip(0).limit(0), Collections.emptyList());
+        exerciseOps(Collections.emptyList(), s -> s.skip(0).limit(10), Collections.emptyList());
+        exerciseOps(Collections.emptyList(), s -> s.skip(10).limit(0), Collections.emptyList());
+        exerciseOps(Collections.emptyList(), s -> s.skip(10).limit(10), Collections.emptyList());
 
-        assertStreamContents(countTo(100), s -> s.skip(0).limit(100), countTo(100));
-        assertStreamContents(countTo(100), s -> s.skip(0).limit(10), countTo(10));
-        assertStreamContents(countTo(100), s -> s.skip(0).limit(0), Collections.emptyList());
-        assertStreamContents(countTo(100), s -> s.skip(10).limit(100), range(11, 100));
-        assertStreamContents(countTo(100), s -> s.skip(10).limit(10), range(11, 20));
-        assertStreamContents(countTo(100), s -> s.skip(10).limit(0), Collections.emptyList());
-        assertStreamContents(countTo(100), s -> s.skip(100).limit(100), Collections.emptyList());
-        assertStreamContents(countTo(100), s -> s.skip(100).limit(10), Collections.emptyList());
-        assertStreamContents(countTo(100), s -> s.skip(100).limit(0), Collections.emptyList());
-        assertStreamContents(countTo(100), s -> s.skip(200).limit(100), Collections.emptyList());
-        assertStreamContents(countTo(100), s -> s.skip(200).limit(10), Collections.emptyList());
-        assertStreamContents(countTo(100), s -> s.skip(200).limit(0), Collections.emptyList());
+        exerciseOps(countTo(100), s -> s.skip(0).limit(100), countTo(100));
+        exerciseOps(countTo(100), s -> s.skip(0).limit(10), countTo(10));
+        exerciseOps(countTo(100), s -> s.skip(0).limit(0), Collections.<Integer>emptyList());
+        exerciseOps(countTo(100), s -> s.skip(10).limit(100), range(11, 100));
+        exerciseOps(countTo(100), s -> s.skip(10).limit(10), range(11, 20));
+        exerciseOps(countTo(100), s -> s.skip(10).limit(0), Collections.<Integer>emptyList());
+        exerciseOps(countTo(100), s -> s.skip(100).limit(100), Collections.<Integer>emptyList());
+        exerciseOps(countTo(100), s -> s.skip(100).limit(10), Collections.<Integer>emptyList());
+        exerciseOps(countTo(100), s -> s.skip(100).limit(0), Collections.<Integer>emptyList());
+        exerciseOps(countTo(100), s -> s.skip(200).limit(100), Collections.<Integer>emptyList());
+        exerciseOps(countTo(100), s -> s.skip(200).limit(10), Collections.<Integer>emptyList());
+        exerciseOps(countTo(100), s -> s.skip(200).limit(0), Collections.<Integer>emptyList());
     }
 
     public void testSlice() {
-        assertStreamContents(Collections.emptyList(), s -> s.slice(0, 0), Collections.emptyList());
-        assertStreamContents(Collections.emptyList(), s -> s.slice(0, 10), Collections.emptyList());
-        assertStreamContents(Collections.emptyList(), s -> s.slice(10, 0), Collections.emptyList());
-        assertStreamContents(Collections.emptyList(), s -> s.slice(10, 10), Collections.emptyList());
+        exerciseOps(Collections.emptyList(), s -> s.slice(0, 0), Collections.emptyList());
+        exerciseOps(Collections.emptyList(), s -> s.slice(0, 10), Collections.emptyList());
+        exerciseOps(Collections.emptyList(), s -> s.slice(10, 0), Collections.emptyList());
+        exerciseOps(Collections.emptyList(), s -> s.slice(10, 10), Collections.emptyList());
 
-        assertStreamContents(countTo(100), s -> s.slice(0, 100), countTo(100));
-        assertStreamContents(countTo(100), s -> s.slice(0, 10), countTo(10));
-        assertStreamContents(countTo(100), s -> s.slice(0, 0), Collections.emptyList());
-        assertStreamContents(countTo(100), s -> s.slice(10, 100), range(11, 100));
-        assertStreamContents(countTo(100), s -> s.slice(10, 10), range(11, 20));
-        assertStreamContents(countTo(100), s -> s.slice(10, 0), Collections.emptyList());
-        assertStreamContents(countTo(100), s -> s.slice(100, 100), Collections.emptyList());
-        assertStreamContents(countTo(100), s -> s.slice(100, 10), Collections.emptyList());
-        assertStreamContents(countTo(100), s -> s.slice(100, 0), Collections.emptyList());
-        assertStreamContents(countTo(100), s -> s.slice(200, 100), Collections.emptyList());
-        assertStreamContents(countTo(100), s -> s.slice(200, 10), Collections.emptyList());
-        assertStreamContents(countTo(100), s -> s.slice(200, 0), Collections.emptyList());
+        exerciseOps(countTo(100), s -> s.slice(0, 100), countTo(100));
+        exerciseOps(countTo(100), s -> s.slice(0, 10), countTo(10));
+        exerciseOps(countTo(100), s -> s.slice(0, 0), Collections.<Integer>emptyList());
+        exerciseOps(countTo(100), s -> s.slice(10, 100), range(11, 100));
+        exerciseOps(countTo(100), s -> s.slice(10, 10), range(11, 20));
+        exerciseOps(countTo(100), s -> s.slice(10, 0), Collections.<Integer>emptyList());
+        exerciseOps(countTo(100), s -> s.slice(100, 100), Collections.<Integer>emptyList());
+        exerciseOps(countTo(100), s -> s.slice(100, 10), Collections.<Integer>emptyList());
+        exerciseOps(countTo(100), s -> s.slice(100, 0), Collections.<Integer>emptyList());
+        exerciseOps(countTo(100), s -> s.slice(200, 100), Collections.<Integer>emptyList());
+        exerciseOps(countTo(100), s -> s.slice(200, 10), Collections.<Integer>emptyList());
+        exerciseOps(countTo(100), s -> s.slice(200, 0), Collections.<Integer>emptyList());
     }
 
     private int sliceSize(int dataSize, int skip, int limit) {
@@ -161,10 +161,10 @@
         List<Integer> skips = Collections.unmodifiableList(sizes(data));
 
         for (int s : skips) {
-            Node<Integer> sr = exerciseOps(data, new SliceOp<>(s));
+            Node<Integer> sr = exerciseOps(data, st -> st.skip(s));
             assertEquals(sr.size(), sliceSize(data.size(), s));
 
-            sr = exerciseOps(data, new SliceOp<>(s), new SliceOp<>(s / 2));
+            sr = exerciseOps(data, st -> st.skip(s).skip(s / 2));
             assertEquals(sr.size(), sliceSize(sliceSize(data.size(), s), s/2));
         }
     }
@@ -176,10 +176,10 @@
 
         for (int s : skips) {
             for (int limit : limits) {
-                Node<Integer> sr = exerciseOps(data, new SliceOp<>(s), new SliceOp<>(0, limit));
+                Node<Integer> sr = exerciseOps(data, st -> st.skip(s).limit(limit));
                 assertEquals(sr.size(), sliceSize(sliceSize(data.size(), s), 0, limit));
 
-                sr = exerciseOps(data, new SliceOp<>(s, limit));
+                sr = exerciseOps(data, st -> st.slice(s, limit));
                 assertEquals(sr.size(), sliceSize(data.size(), s, limit));
             }
         }
@@ -191,10 +191,10 @@
 
         for (int limit : limits) {
             Node<Integer> sr = null;
-            sr = exerciseOps(data, new SliceOp<>(0, limit));
+            sr = exerciseOps(data, st -> st.limit(limit));
             assertEquals(sr.size(), sliceSize(data.size(), 0, limit));
 
-            sr = exerciseOps(data, new SliceOp<>(0, limit), new SliceOp<>(0, limit / 2));
+            sr = exerciseOps(data, st -> st.limit(limit).limit(limit / 2));
             assertEquals(sr.size(), sliceSize(sliceSize(data.size(), 0, limit), 0, limit/2));
         }
     }
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/TeeOpTest.java	Thu Nov 15 17:35:31 2012 +0100
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/TeeOpTest.java	Thu Nov 15 17:55:02 2012 +0100
@@ -34,6 +34,7 @@
 import java.util.Iterators;
 import java.util.List;
 import java.util.functions.Block;
+import java.util.streams.Stream;
 import java.util.streams.ops.TeeOp;
 
 import static org.openjdk.tests.java.util.LambdaTestHelpers.*;
@@ -83,7 +84,7 @@
         class RecordingBlock implements Block<Integer> {
             List<Integer> list;
 
-            void before(TestData<Integer> td) {
+            void before(TestData<Integer, Stream<Integer>> td) {
                 // Tee block can be called concurrently
                 list = Collections.<Integer>synchronizedList(new ArrayList<>());
             }
@@ -92,7 +93,7 @@
                 list.add(t);
             }
 
-            void after(TestData<Integer> td) {
+            void after(TestData<Integer, Stream<Integer>> td) {
                 // No guarantees in parallel tests that calls to tee block will
                 // be in the encounter order, if defined, of the data
                 assertContentsUnordered(list, data.into(new ArrayList<Integer>()));
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/UniqOpTest.java	Thu Nov 15 17:35:31 2012 +0100
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/UniqOpTest.java	Thu Nov 15 17:55:02 2012 +0100
@@ -31,6 +31,7 @@
 import org.testng.annotations.Test;
 
 import java.util.streams.ParallelPipelineHelper;
+import java.util.streams.Stream;
 import java.util.streams.StreamOpFlags;
 import java.util.streams.ops.*;
 
@@ -53,9 +54,7 @@
 
     @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
     public void testOp(String name, StreamTestData<Integer> data) {
-        UniqOp<Integer> op = UniqOp.singleton();
-
-        Node<Integer> result = exerciseOps(data, op);
+        Node<Integer> result = exerciseOps(data, s -> s.uniqueElements());
 
         assertUnique(result);
         if (data.size() > 0)
@@ -68,10 +67,8 @@
 
     @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
     public void testOpUnordered(String name, StreamTestData<Integer> data) {
-        UniqOp<Integer> op = UniqOp.singleton();
-
         Node<Integer> result = withData(data).
-                <Integer>using(new FlagDeclaringOp<>(StreamOpFlags.NOT_ORDERED), op).
+                using(s -> s.unordered().uniqueElements()).
                 parallelEqualator(LambdaTestHelpers::equalsContentsUnordered).
                 exercise();
 
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/primitives/IntStreamIntermediateOpTestScenario.java	Thu Nov 15 17:35:31 2012 +0100
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/primitives/IntStreamIntermediateOpTestScenario.java	Thu Nov 15 17:55:02 2012 +0100
@@ -28,6 +28,7 @@
 
 import java.util.Iterator;
 import java.util.functions.Block;
+import java.util.functions.Mapper;
 import java.util.streams.*;
 import java.util.streams.ops.FlagDeclaringOp;
 import java.util.streams.ops.IntermediateOp;
@@ -40,29 +41,29 @@
 public enum IntStreamIntermediateOpTestScenario implements OpTestCase.IntermediateOpTestScenario {
 
     STREAM_FOR_EACH(false) {
-        public void run(IntStreamTestData data, IntBlock b, IntermediateOp[] ops) {
-            stream(data.seq(ops)).forEach(b);
+        <T, S_IN extends BaseStream<T>> void _run(OpTestCase.TestData<T, S_IN> data, IntBlock b, Mapper<IntStream, S_IN> m) {
+            m.map(data.stream()).forEach(b);
         }
     },
 
     STREAM_TO_ARRAY(false) {
-        public void run(IntStreamTestData data, IntBlock b, IntermediateOp[] ops) {
-            for (int t : stream(data.seq(ops)).toArray()) {
+        <T, S_IN extends BaseStream<T>> void _run(OpTestCase.TestData<T, S_IN> data, IntBlock b, Mapper<IntStream, S_IN> m) {
+            for (int t : m.map(data.stream()).toArray()) {
                 b.applyInt(t);
             }
         }
     },
 
     STREAM_ITERATOR(false) {
-        public void run(IntStreamTestData data, IntBlock b, IntermediateOp[] ops) {
-            for (IntIterator seqIter = stream(data.seq(ops)).iterator(); seqIter.hasNext(); )
+        <T, S_IN extends BaseStream<T>> void _run(OpTestCase.TestData<T, S_IN> data, IntBlock b, Mapper<IntStream, S_IN> m) {
+            for (IntIterator seqIter = m.map(data.stream()).iterator(); seqIter.hasNext(); )
                 b.applyInt(seqIter.nextInt());
         }
     },
 
     STREAM_MIXED(false) {
-        public void run(IntStreamTestData data, IntBlock b, IntermediateOp[] ops) {
-            IntStream stream = stream(data.seq(ops));
+        <T, S_IN extends BaseStream<T>> void _run(OpTestCase.TestData<T, S_IN> data, IntBlock b, Mapper<IntStream, S_IN> m) {
+            IntStream stream = m.map(data.stream());
             IntIterator iter = stream.iterator();
             if (iter.hasNext())
                 b.applyInt(iter.nextInt());
@@ -71,51 +72,51 @@
     },
 
     STREAM_MIXED_ITERATOR_FOR_EACH(false) {
-        public void run(IntStreamTestData data, IntBlock b, IntermediateOp[] ops) {
-            AbstractPipeline<?, ?> pipe1 = data.seq(new NoOp());
-            AbstractPipeline<?, ?> pipe2 = OpTestCase.chain(pipe1, ops);
+        <T, S_IN extends BaseStream<T>> void _run(OpTestCase.TestData<T, S_IN> data, IntBlock b, Mapper<IntStream, S_IN> m) {
+            AbstractPipeline<?, ?> pipe1 = data.seq(new NoOp(data.getShape()));
+            IntStream pipe2 = m.map((S_IN) pipe1);
 
             pipe1.iterator();
-            stream(pipe2).forEach(b);
+            pipe2.forEach(b);
         }
     },
 
     PAR_STREAM_SEQUENTIAL_FOR_EACH(true) {
-        public void run(IntStreamTestData data, IntBlock b, IntermediateOp[] ops) {
-            stream(data.par(ops)).sequential().forEach(b);
+        <T, S_IN extends BaseStream<T>> void _run(OpTestCase.TestData<T, S_IN> data, IntBlock b, Mapper<IntStream, S_IN> m) {
+            m.map(data.parallel()).sequential().forEach(b);
         }
     },
 
     PAR_STREAM_TO_ARRAY(true) {
-        public void run(IntStreamTestData data, IntBlock b, IntermediateOp[] ops) {
-            for (int t : stream(data.par(ops)).toArray())
+        <T, S_IN extends BaseStream<T>> void _run(OpTestCase.TestData<T, S_IN> data, IntBlock b, Mapper<IntStream, S_IN> m) {
+            for (int t : m.map(data.parallel()).toArray())
                 b.applyInt(t);
         }
     },
 
     PAR_STREAM_TO_ARRAY_CLEAR_SIZED(true) {
-        public void run(IntStreamTestData data, IntBlock b, IntermediateOp[] ops) {
-            AbstractPipeline<?, ?> pipe1 = data.seq(new FlagDeclaringOp(StreamOpFlags.NOT_SIZED) {
+        <T, S_IN extends BaseStream<T>> void _run(OpTestCase.TestData<T, S_IN> data, IntBlock b, Mapper<IntStream, S_IN> m) {
+            AbstractPipeline<?, ?> pipe1 = data.par(new FlagDeclaringOp(StreamOpFlags.NOT_SIZED) {
                 @Override
                 public StreamShape outputShape() {
-                    return StreamShapeFactory.INT_VALUE;
+                    return data.getShape();
                 }
 
                 @Override
                 public StreamShape inputShape() {
-                    return StreamShapeFactory.INT_VALUE;
+                    return data.getShape();
                 }
             });
-            AbstractPipeline<?, ?> pipe2 = OpTestCase.chain(pipe1, ops);
+            IntStream pipe2 = m.map((S_IN) pipe1);
 
-            for (int t : stream(pipe2).toArray())
+            for (int t : pipe2.toArray())
                 b.applyInt(t);
         }
     },
 
     PAR_STREAM_ITERATOR_TO_ARRAY_MIXED(true) {
-        public void run(IntStreamTestData data, IntBlock b, IntermediateOp[] ops) {
-            IntStream stream = stream(data.par(ops));
+        <T, S_IN extends BaseStream<T>> void _run(OpTestCase.TestData<T, S_IN> data, IntBlock b, Mapper<IntStream, S_IN> m) {
+            IntStream stream = m.map(data.parallel());
             IntIterator iter = stream.iterator();
             if (iter.hasNext())
                 b.applyInt(iter.nextInt());
@@ -125,20 +126,20 @@
     },
 
     PAR_STREAM_MIXED_ITERATOR_TO_ARRAY(true) {
-        public void run(IntStreamTestData data, IntBlock b, IntermediateOp[] ops) {
-            AbstractPipeline<?, ?> pipe1 = data.par(new NoOp());
-            AbstractPipeline<?, ?> pipe2 = OpTestCase.chain(pipe1, ops);
+        <T, S_IN extends BaseStream<T>> void _run(OpTestCase.TestData<T, S_IN> data, IntBlock b, Mapper<IntStream, S_IN> m) {
+            AbstractPipeline<?, ?> pipe1 = data.par(new NoOp(data.getShape()));
+            IntStream pipe2 = m.map((S_IN) pipe1);
 
             pipe1.iterator();
-            for (int t : stream(pipe2).toArray())
+            for (int t : pipe2.toArray())
                 b.applyInt(t);
         }
     },
 
     // Wrap as parallel stream, and iterate in mixed mode
     PAR_STREAM_SEQUENTIAL_MIXED(true) {
-        public void run(IntStreamTestData data, IntBlock b, IntermediateOp[] ops) {
-            IntStream stream = stream(data.par(ops));
+        <T, S_IN extends BaseStream<T>> void _run(OpTestCase.TestData<T, S_IN> data, IntBlock b, Mapper<IntStream, S_IN> m) {
+            IntStream stream = m.map(data.parallel());
             IntIterator iter = stream.iterator();
             if (iter.hasNext())
                 b.applyInt(iter.nextInt());
@@ -148,24 +149,31 @@
 
     ;
 
-    private static class NoOp implements IntermediateOp<Integer, Integer> {
+    private static class NoOp<T> implements IntermediateOp<T, T> {
+
+        private final StreamShape shape;
+
+        private NoOp(StreamShape shape) {
+            this.shape = shape;
+        }
+
         @Override
         public StreamShape inputShape() {
-            return StreamShapeFactory.INT_VALUE;
+            return shape;
         }
 
         @Override
         public StreamShape outputShape() {
-            return StreamShapeFactory.INT_VALUE;
+            return shape;
         }
 
         @Override
-        public Iterator<Integer> wrapIterator(int flags, Iterator<Integer> in) {
+        public Iterator<T> wrapIterator(int flags, Iterator<T> in) {
             return in;
         }
 
         @Override
-        public Sink<Integer> wrapSink(int flags, Sink<Integer> sink) {
+        public Sink<T> wrapSink(int flags, Sink<T> sink) {
             return sink;
         }
     }
@@ -184,14 +192,10 @@
         return isParallel;
     }
 
-    public <T> void run(OpTestCase.TestData<T> data, Block b, IntermediateOp[] ops) {
-        run((IntStreamTestData) data, Primitives.adapt(b), ops);
+    public <T, U, S_IN extends BaseStream<T>, S_OUT extends BaseStream<U>> void run(OpTestCase.TestData<T, S_IN> data, Block<U> b, Mapper<S_OUT, S_IN> m) {
+        _run(data, (IntBlock) b, (Mapper<IntStream, S_IN>) m);
     }
 
-    public abstract <T> void run(IntStreamTestData data, IntBlock b, IntermediateOp[] ops);
-
-    IntStream stream(AbstractPipeline<?, ?> ap) {
-        return (IntStream) ap;
-    }
+    abstract <T, S_IN extends BaseStream<T>> void _run(OpTestCase.TestData<T, S_IN> data, IntBlock b, Mapper<IntStream, S_IN> m);
 
 }
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/primitives/IntStreamTestData.java	Thu Nov 15 17:35:31 2012 +0100
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/primitives/IntStreamTestData.java	Thu Nov 15 17:55:02 2012 +0100
@@ -27,6 +27,7 @@
 import org.openjdk.tests.java.util.streams.OpTestCase;
 
 import java.util.streams.AbstractPipeline;
+import java.util.streams.Stream;
 import java.util.streams.StreamShape;
 import java.util.streams.StreamShapeFactory;
 import java.util.streams.primitives.IntIterator;
@@ -34,7 +35,7 @@
 import java.util.streams.primitives.IntStream;
 import java.util.streams.primitives.Primitives;
 
-public abstract class IntStreamTestData implements OpTestCase.TestData<Integer> {
+public abstract class IntStreamTestData implements OpTestCase.TestData<Integer, IntStream> {
 
     @Override
     @SuppressWarnings("rawtypes")
@@ -42,16 +43,6 @@
         return StreamShapeFactory.INT_VALUE;
     }
 
-    @SuppressWarnings("unchecked")
-    public IntStream seqStream() {
-        return (IntStream) seq();
-    }
-
-    @SuppressWarnings("unchecked")
-    public IntStream parStream() {
-        return (IntStream) par();
-    }
-
     public static class ArrayData extends IntStreamTestData {
         private final String name;
         private final int[] array;
@@ -62,15 +53,13 @@
         }
 
         @Override
-        @SuppressWarnings({ "rawtypes", "unchecked" })
-        public AbstractPipeline<?, Integer> seq() {
-            return (AbstractPipeline<?, Integer>) Primitives.stream(array);
+        public IntStream stream() {
+            return Primitives.stream(array);
         }
 
         @Override
-        @SuppressWarnings({ "rawtypes", "unchecked" })
-        public AbstractPipeline<?, Integer> par() {
-            return (AbstractPipeline<?, Integer>) Primitives.parallel(array);
+        public IntStream parallel() {
+            return Primitives.parallel(array);
         }
 
         @Override