changeset 6444:c54ba7563085

Merge
author briangoetz
date Mon, 19 Nov 2012 18:05:09 -0500
parents 728100b14718 ebfb2df4ab5b
children 0a30434d0121
files
diffstat 24 files changed, 280 insertions(+), 201 deletions(-) [+]
line wrap: on
line diff
--- a/test-ng/tests/org/openjdk/tests/java/util/LambdaTestHelpers.java	Mon Nov 19 17:56:46 2012 -0500
+++ b/test-ng/tests/org/openjdk/tests/java/util/LambdaTestHelpers.java	Mon Nov 19 18:05:09 2012 -0500
@@ -242,14 +242,10 @@
 
     public static <T> boolean equalsContentsUnordered(Iterable<T> a, Iterable<T> b) {
         Set<T> sa = new HashSet<>();
-        for (T t : a) {
-            sa.add(t);
-        }
+        a.forEach(sa::add);
 
         Set<T> sb = new HashSet<>();
-        for (T t : b) {
-            sb.add(t);
-        }
+        b.forEach(sb::add);
 
         return Objects.equals(sa, sb);
     }
--- a/test-ng/tests/org/openjdk/tests/java/util/stream/OpTestCase.java	Mon Nov 19 17:56:46 2012 -0500
+++ b/test-ng/tests/org/openjdk/tests/java/util/stream/OpTestCase.java	Mon Nov 19 18:05:09 2012 -0500
@@ -72,19 +72,19 @@
 
     public <T, U, S_IN extends BaseStream<T>, S_OUT extends BaseStream<U>> Node<U> exerciseOps(TestData<T, S_IN> data, Function<S_IN, S_OUT> m) {
         return withData(data).
-                using(m).
+                stream(m).
                 exercise();
     }
 
     public <T, U, S_OUT extends BaseStream<U>> Node<U> exerciseOps(Collection<T> data, Function<Stream<T>, S_OUT> m) {
         return withData(new StreamTestData.CollectionData<>("Collection of type " + data.getClass().getName(), data)).
-                using(m).
+                stream(m).
                 exercise();
     }
 
     public <T, U, S_OUT extends BaseStream<U>, I extends Iterable<U> & Sized> Node<U> exerciseOps(Collection<T> data, Function<Stream<T>, S_OUT> m, I expected) {
         return withData(new StreamTestData.CollectionData<>("Collection of type " + data.getClass().getName(), data)).
-                using(m).
+                stream(m).
                 expectedResult(expected).
                 exercise();
     }
@@ -106,9 +106,19 @@
             return new DataIntemediateOpBuilder<>(data);
         }
 
-        public <U, S_OUT extends BaseStream<U>> ExcerciseDataStreamBuilder<T, U, S_IN, S_OUT> using(Function<S_IN, S_OUT> m) {
+        public <U, S_OUT extends BaseStream<U>> ExcerciseDataStreamBuilder<T, U, S_IN, S_OUT> stream(Function<S_IN, S_OUT> m) {
             return new ExcerciseDataStreamBuilder<>(data, m);
         }
+
+        public <R> ExcerciseDataTerminalBuilder<T, T, R, S_IN, S_IN>
+        terminal(Function<S_IN, R> terminalF) {
+            return new ExcerciseDataTerminalBuilder<>(data, s -> s, terminalF);
+        }
+
+        public <U, R, S_OUT extends BaseStream<U>> ExcerciseDataTerminalBuilder<T, U, R, S_IN, S_OUT>
+        terminal(Function<S_IN, S_OUT> streamF, Function<S_OUT, R> terminalF) {
+            return new ExcerciseDataTerminalBuilder<>(data, streamF, terminalF);
+        }
     }
 
     @SuppressWarnings("rawtypes")
@@ -133,7 +143,7 @@
         }
     }
 
-    @SuppressWarnings("rawtypes")
+    @SuppressWarnings({"rawtypes", "unchecked"})
     public class ExcerciseDataStreamBuilder<T, U, S_IN extends BaseStream<T>, S_OUT extends BaseStream<U>> {
         final TestData<T, S_IN> data;
 
@@ -246,117 +256,159 @@
         // Build method
 
         public Node<U> exercise() {
-            return exerciseOps(this);
+            if (refResult == null) {
+                // Induce the reference result
+                before.accept(data);
+                S_OUT sOut = m.apply(data.stream());
+                refResult = ((AbstractPipeline<?, U>) sOut).collectOutput().flatten();
+                after.accept(data);
+            }
+
+            for (BaseStreamTestScenario test : testSet) {
+                before.accept(data);
+
+                NodeBuilder<U> resultBuilder = shape.makeNodeBuilder(-1);
+                resultBuilder.begin(-1);
+                test.run(data, resultBuilder, m);
+                resultBuilder.end();
+                Node<U> result = resultBuilder.build();
+
+                assertTrue(getEqualator(test).test(result, refResult),
+                           String.format("%s %s: %s != %s", data.toString(), test, refResult, result));
+
+                after.accept(data);
+            }
+
+            return refResult;
         }
     }
 
-    @SuppressWarnings({"rawtypes", "unchecked"})
-    public <T, U, S_IN extends BaseStream<T>, S_OUT extends BaseStream<U>> Node<U> exerciseOps(ExcerciseDataStreamBuilder<T, U, S_IN, S_OUT> b) {
-        Node<U> refResult;
-        if (b.refResult != null) {
-            refResult = b.refResult;
-        }
-        else {
-            // Induce the reference result
-            b.before.accept(b.data);
-            S_OUT sOut = b.m.apply(b.data.stream());
-            refResult = ((AbstractPipeline<?, U>) sOut).collectOutput().flatten();
-            b.after.accept(b.data);
-        }
-
-        for (BaseStreamTestScenario test : b.testSet) {
-            b.before.accept(b.data);
-
-            NodeBuilder<U> resultBuilder = b.shape.makeNodeBuilder(-1);
-            resultBuilder.begin(-1);
-            test.run(b.data, resultBuilder, b.m);
-            resultBuilder.end();
-            Node<U> result = resultBuilder.build();
-
-            assertTrue(b.getEqualator(test).test(result, refResult),
-                       String.format("%s %s: %s != %s", b.data.toString(), test, refResult, result));
-
-            b.after.accept(b.data);
-        }
-
-        return refResult;
-    }
-
     // Exercise terminal operations
 
-    public <T, U, R, S_IN extends BaseStream<T>> R
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    public class ExcerciseDataTerminalBuilder<T, U, R, S_IN extends BaseStream<T>, S_OUT extends BaseStream<U>> {
+        final TestData<T, S_IN> data;
+
+        final Function<S_IN, S_OUT> streamF;
+
+        final Function<S_OUT, R> terminalF;
+
+        R refResult;
+
+        BiPredicate<R, R> sequentialEqualator = Objects::equals;
+
+        BiPredicate<R, R> parallelEqualator = Objects::equals;
+
+        private ExcerciseDataTerminalBuilder(TestData<T, S_IN> data, Function<S_IN, S_OUT> streamF, Function<S_OUT, R> terminalF) {
+            this.data = data;
+
+            this.streamF = Objects.requireNonNull(streamF);
+
+            this.terminalF = Objects.requireNonNull(terminalF);
+        }
+
+        //
+
+        @SuppressWarnings("unchecked")
+        public <I extends Iterable<U> & Sized> ExcerciseDataTerminalBuilder<T, U, R, S_IN, S_OUT> expectedResult(R expectedResult) {
+            this.refResult = expectedResult;
+            return this;
+        }
+
+        public ExcerciseDataTerminalBuilder<T, U, R, S_IN, S_OUT> equalator(BiPredicate<R, R> equalator) {
+            this.sequentialEqualator = equalator;
+            this.parallelEqualator = equalator;
+            return this;
+        }
+
+        public ExcerciseDataTerminalBuilder<T, U, R, S_IN, S_OUT> sequentialEqualator(BiPredicate<R, R> equalator) {
+            this.sequentialEqualator = equalator;
+            return this;
+        }
+
+        public ExcerciseDataTerminalBuilder<T, U, R, S_IN, S_OUT> parallelEqualator(BiPredicate<R, R> equalator) {
+            this.parallelEqualator = equalator;
+            return this;
+        }
+
+        // Build method
+
+        public R exercise() {
+            AbstractPipeline<?, U> ap = (AbstractPipeline<?, U>) streamF.apply(data.stream());
+            StreamShape shape = ap.getOutputShape();
+
+            Node<U> node = ap.collectOutput();
+            if (refResult == null) {
+                // Sequentially collect the output that will be input to the terminal op
+                refResult = terminalF.apply((S_OUT) shape.stream(node));
+            }
+
+            // Single sequential using pull
+            {
+                // @@@ Using specific stream implementation
+                AbstractPipeline<?, U> source = shape.stream(node);
+                // Force pull mode
+                source = source.pipeline(new PullOnlyOp<U>(shape));
+                R result = terminalF.apply((S_OUT) source);
+                assertTrue(sequentialEqualator.test(refResult, result), String.format("Single sequential pull: %s != %s", refResult, result));
+            }
+
+            // Single parallel
+            {
+                // @@@ Using specific stream implementation
+                R result = terminalF.apply((S_OUT) shape.parallel(node));
+                assertTrue(parallelEqualator.test(refResult, result), String.format("Single parallel: %s != %s", refResult, result));
+            }
+
+            // All sequential
+            // This may push or pull depending on the terminal op implementation
+            {
+                R result = terminalF.apply(streamF.apply(data.stream()));
+                assertTrue(sequentialEqualator.test(refResult, result), String.format("All sequential: %s != %s", refResult, result));
+            }
+
+            // All sequential using pull
+            {
+                S_OUT source = streamF.apply(data.stream());
+                // Force pull mode
+                source = (S_OUT) ((AbstractPipeline) source).pipeline(new PullOnlyOp<U>(shape));
+                R result = terminalF.apply(source);
+                assertTrue(sequentialEqualator.test(refResult, result), String.format("All sequential pull: %s != %s", refResult, result));
+            }
+
+            // All parallel
+            {
+                R result = terminalF.apply(streamF.apply(data.parallel()));
+                assertTrue(parallelEqualator.test(refResult, result), String.format("All parallel: %s != %s", refResult, result));
+            }
+
+            return refResult;
+        }
+    }
+
+    public <T, R> R exerciseTerminalOps(Collection<T> data, Function<Stream<T>, R> m, R expected) {
+        return withData(new StreamTestData.CollectionData<>("Collection of type " + data.getClass().getName(), data)).
+                terminal(m).
+                expectedResult(expected).
+                exercise();
+    }
+
+    public <T, R, S_IN extends BaseStream<T>> R
     exerciseTerminalOps(TestData<T, S_IN> data,
                         Function<S_IN, R> terminalF) {
-        return exerciseTerminalOps(data,
-                                   s -> s,
-                                   terminalF);
+        return withData(data).
+                terminal(s -> s, terminalF).
+                exercise();
     }
 
     @SuppressWarnings({"rawtypes", "unchecked"})
     public <T, U, R, S_IN extends BaseStream<T>, S_OUT extends BaseStream<U>> R
     exerciseTerminalOps(TestData<T, S_IN> data,
-                        Function<S_IN, S_OUT> m,
-                        Function<S_OUT, R> terminalF) {
-        return exerciseTerminalOps(data,
-                                   (u, v) -> Objects.equals(u, v),
-                                   m,
-                                   terminalF);
-    }
-
-    @SuppressWarnings({"rawtypes", "unchecked"})
-    public <T, U, R, S_IN extends BaseStream<T>, S_OUT extends BaseStream<U>> R
-    exerciseTerminalOps(TestData<T, S_IN> data,
-                        BiPredicate<R, R> equalator,
                         Function<S_IN, S_OUT> streamF,
                         Function<S_OUT, R> terminalF) {
-
-        AbstractPipeline<?, U> ap = (AbstractPipeline<?, U>) streamF.apply(data.stream());
-        StreamShape shape = ap.getOutputShape();
-
-        // Sequentially collect the output that will be input to the terminal op
-        Node<U> node = ap.collectOutput();
-        R refResult = terminalF.apply((S_OUT) shape.stream(node));
-
-        // Single sequential using pull
-        {
-            // @@@ Using specific stream implementation
-            AbstractPipeline<?, U> source = shape.stream(node);
-            // Force pull mode
-            source = source.pipeline(new PullOnlyOp<U>(shape));
-            R result = terminalF.apply((S_OUT) source);
-            assertTrue(equalator.test(refResult, result), String.format("Single sequential pull: %s != %s", refResult, result));
-        }
-
-        // Single parallel
-        {
-            // @@@ Using specific stream implementation
-            R result = terminalF.apply((S_OUT) shape.parallel(node));
-            assertTrue(equalator.test(refResult, result), String.format("Single parallel: %s != %s", refResult, result));
-        }
-
-        // All sequential
-        // This may push or pull depending on the terminal op implementation
-        {
-            R result = terminalF.apply(streamF.apply(data.stream()));
-            assertTrue(equalator.test(refResult, result), String.format("All sequential: %s != %s", refResult, result));
-        }
-
-        // All sequential using pull
-        {
-            S_OUT source = streamF.apply(data.stream());
-            // Force pull mode
-            source = (S_OUT) ((AbstractPipeline) source).pipeline(new PullOnlyOp<U>(shape));
-            R result = terminalF.apply(source);
-            assertTrue(equalator.test(refResult, result), String.format("All sequential pull: %s != %s", refResult, result));
-        }
-
-        // All parallel
-        {
-            R result = terminalF.apply(streamF.apply(data.parallel()));
-            assertTrue(equalator.test(refResult, result), String.format("All parallel: %s != %s", refResult, result));
-        }
-
-        return refResult;
+        return withData(data).
+                terminal(streamF, terminalF).
+                exercise();
     }
 
     //
--- a/test-ng/tests/org/openjdk/tests/java/util/stream/StreamReuseTest.java	Mon Nov 19 17:56:46 2012 -0500
+++ b/test-ng/tests/org/openjdk/tests/java/util/stream/StreamReuseTest.java	Mon Nov 19 18:05:09 2012 -0500
@@ -86,7 +86,7 @@
 
     // Stream
 
-    @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
+    @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
     public void testTwoTerminals(String name, StreamTestData<Integer> data) {
         assertSecondFails(data,
                           (Stream<Integer> s) -> s.findFirst(), (Stream<Integer> s) -> s.findFirst(),
@@ -94,7 +94,7 @@
                           "Stream findFirst / findFirst succeeded erroneously");
     }
 
-    @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
+    @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
     public void testTwoIterators(String name, StreamTestData<Integer> data) {
         assertSecondFails(data,
                           (Stream<Integer> s) -> s.iterator(), (Stream<Integer> s) -> s.iterator(),
@@ -102,7 +102,7 @@
                           "Stream iterator / iterator succeeded erroneously");
     }
 
-    @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
+    @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
     public void testTerminalIterator(String name, StreamTestData<Integer> data) {
         assertSecondFails(data,
                           (Stream<Integer> s) -> s.iterator(), (Stream<Integer> s) -> s.findFirst(),
--- a/test-ng/tests/org/openjdk/tests/java/util/stream/StreamTestDataProvider.java	Mon Nov 19 17:56:46 2012 -0500
+++ b/test-ng/tests/org/openjdk/tests/java/util/stream/StreamTestDataProvider.java	Mon Nov 19 18:05:09 2012 -0500
@@ -107,9 +107,9 @@
         return new Object[] { description, m.combine(description, data) };
     }
 
-    // Return an array of ( String name, TestData<Integer> )
-    @DataProvider(name = "opArrays")
-    public static Object[][] makeValueTestData() {
+    // Return an array of ( String name, StreamTestData<Integer> )
+    @DataProvider(name = "StreamTestData<Integer>")
+    public static Object[][] makeStreamTestData() {
         return testData;
     }
 }
--- a/test-ng/tests/org/openjdk/tests/java/util/stream/op/ConcatOpTest.java	Mon Nov 19 17:56:46 2012 -0500
+++ b/test-ng/tests/org/openjdk/tests/java/util/stream/op/ConcatOpTest.java	Mon Nov 19 18:05:09 2012 -0500
@@ -29,11 +29,9 @@
 import org.openjdk.tests.java.util.stream.StreamTestData;
 import org.openjdk.tests.java.util.stream.StreamTestDataProvider;
 import org.testng.annotations.Test;
-import sun.jvmstat.monitor.IntegerMonitor;
 
 import java.util.*;
 import java.util.stream.Streams;
-import java.util.stream.op.ConcatOp;
 
 import static org.openjdk.tests.java.util.LambdaTestHelpers.*;
 
@@ -104,17 +102,17 @@
                        countTo(10).stream().iterator());
     }
 
-    @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
+    @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
     public void testOpsSequential(String name, StreamTestData<Integer> data) {
         withData(data).
-                using(s -> s.concat(data.stream())).
+                stream(s -> s.concat(data.stream())).
                 exercise();
     }
 
-    @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
+    @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
     public void testOpsParallel(String name, StreamTestData<Integer> data) {
         withData(data).
-                using(s -> s.concat(data.parallel())).
+                stream(s -> s.concat(data.parallel())).
                 exercise();
     }
 }
--- a/test-ng/tests/org/openjdk/tests/java/util/stream/op/CumulateOpTest.java	Mon Nov 19 17:56:46 2012 -0500
+++ b/test-ng/tests/org/openjdk/tests/java/util/stream/op/CumulateOpTest.java	Mon Nov 19 18:05:09 2012 -0500
@@ -55,7 +55,7 @@
         assertContents(countTo(5).stream().cumulate(rMin).iterator(), 1, 1, 1, 1, 1);
     }
 
-    @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
+    @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
     public void testOps(String name, StreamTestData<Integer> data) {
         Node<Integer> result = exerciseOps(data, s -> s.cumulate(rPlus));
         assertEquals(result.size(), data.size());
@@ -67,7 +67,7 @@
         assertEquals(result.size(), data.size());
     }
 
-    @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
+    @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
     public void testWithFilter(String name, StreamTestData<Integer> data) {
         exerciseOps(data, s -> s.filter(pEven).cumulate(rPlus));
         exerciseOps(data, s -> s.filter(pEven).map(mDoubler).cumulate(rPlus));
--- a/test-ng/tests/org/openjdk/tests/java/util/stream/op/FilterOpTest.java	Mon Nov 19 17:56:46 2012 -0500
+++ b/test-ng/tests/org/openjdk/tests/java/util/stream/op/FilterOpTest.java	Mon Nov 19 18:05:09 2012 -0500
@@ -67,7 +67,7 @@
         exerciseOps(countTo(1000), s -> s.filter(e -> e == 100), Arrays.asList(100));
     }
 
-    @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
+    @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
     public void testOps(String name, StreamTestData<Integer> data) {
         Node<Integer> result = exerciseOps(data, s -> s.filter(pTrue));
         assertEquals(result.size(), data.size());
--- a/test-ng/tests/org/openjdk/tests/java/util/stream/op/FindAnyOpTest.java	Mon Nov 19 17:56:46 2012 -0500
+++ b/test-ng/tests/org/openjdk/tests/java/util/stream/op/FindAnyOpTest.java	Mon Nov 19 18:05:09 2012 -0500
@@ -62,13 +62,14 @@
         assertTrue(countTo(1000).parallel().filter(pEven).findAny().isPresent(), "with result");
     }
 
-    @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
+    @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
     public void testOps(String name, StreamTestData<Integer> data) {
         // @@@ Weak test -- only tests that all versions either find an answer or don't, don't assert validity of answer
         // Would be good to test that the result is actually a member of the stream
         BiPredicate<Optional<Integer>, Optional<Integer>> validAnswer = (a, b) -> a.isPresent() == b.isPresent();
-        exerciseTerminalOps(data, validAnswer, s -> s, s -> s.findAny());
-        exerciseTerminalOps(data, validAnswer, s -> s.filter(pTrue), s -> s.findAny());
-        exerciseTerminalOps(data, validAnswer, s -> s.filter(pEven), s -> s.findAny());
+
+        withData(data).terminal(s -> s.findAny()).equalator(validAnswer).exercise();
+        withData(data).terminal(s -> s.filter(pTrue), s -> s.findAny()).equalator(validAnswer).exercise();
+        withData(data).terminal(s -> s.filter(pEven), s -> s.findAny()).equalator(validAnswer).exercise();
     }
 }
--- a/test-ng/tests/org/openjdk/tests/java/util/stream/op/FindFirstOpTest.java	Mon Nov 19 17:56:46 2012 -0500
+++ b/test-ng/tests/org/openjdk/tests/java/util/stream/op/FindFirstOpTest.java	Mon Nov 19 18:05:09 2012 -0500
@@ -62,7 +62,7 @@
         exerciseOps(countTo(1000), s-> Arrays.asList(new Integer[] { s.filter(e -> e == 1499).findFirst().orElse(-1) }).stream(), Arrays.asList(-1));
     }
 
-    @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
+    @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
     public void testOps(String name, StreamTestData<Integer> data) {
         exerciseTerminalOps(data, s -> s.findFirst());
         exerciseTerminalOps(data, s -> s.filter(pEven), s -> s.findFirst());
@@ -70,7 +70,7 @@
         exerciseTerminalOps(data, s -> s.filter(pFalse), s -> s.findFirst());
     }
 
-    @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
+    @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
     public void testPipelines(String name, StreamTestData<Integer> data) {
         Optional<Integer> seq, par;
         seq = data.stream().findFirst();
--- a/test-ng/tests/org/openjdk/tests/java/util/stream/op/FlagOpTest.java	Mon Nov 19 17:56:46 2012 -0500
+++ b/test-ng/tests/org/openjdk/tests/java/util/stream/op/FlagOpTest.java	Mon Nov 19 18:05:09 2012 -0500
@@ -42,7 +42,7 @@
 @Test
 public class FlagOpTest extends OpTestCase {
 
-    @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
+    @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
     public void testFlagsPassThrough(String name, StreamTestData<Integer> data) {
 
         @SuppressWarnings({"unchecked", "rawtypes"})
--- a/test-ng/tests/org/openjdk/tests/java/util/stream/op/FlatMapOpTest.java	Mon Nov 19 17:56:46 2012 -0500
+++ b/test-ng/tests/org/openjdk/tests/java/util/stream/op/FlatMapOpTest.java	Mon Nov 19 18:05:09 2012 -0500
@@ -73,7 +73,7 @@
         exerciseOps(new StreamTestData.ArrayData<>("LONG_STRING", new String[]{LONG_STRING}), s -> s.flatMap(flattenChars));
     }
 
-    @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
+    @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
     public void testOps(String name, StreamTestData<Integer> data) {
         Node<Integer> result = exerciseOps(data, s -> s.flatMap(mfId));
         assertEquals(data.size(), result.size());
--- a/test-ng/tests/org/openjdk/tests/java/util/stream/op/ForEachOpTest.java	Mon Nov 19 17:56:46 2012 -0500
+++ b/test-ng/tests/org/openjdk/tests/java/util/stream/op/ForEachOpTest.java	Mon Nov 19 18:05:09 2012 -0500
@@ -25,10 +25,15 @@
 package org.openjdk.tests.java.util.stream.op;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Stream;
 import java.util.stream.op.ForEachOp;
+import java.util.stream.primitive.IntNodeBuilder;
+import java.util.stream.primitive.IntNodes;
 
+import org.openjdk.tests.java.util.LambdaTestHelpers;
 import org.openjdk.tests.java.util.stream.OpTestCase;
 import org.openjdk.tests.java.util.stream.StreamTestData;
 import org.openjdk.tests.java.util.stream.StreamTestDataProvider;
@@ -44,26 +49,39 @@
 @Test
 public class ForEachOpTest extends OpTestCase {
 
-    public void testForEachParallel() {
-        AtomicInteger count = new AtomicInteger(0);
-        AtomicInteger sum = new AtomicInteger(0);
-        countTo(10).parallel().forEach(e -> { count.incrementAndGet(); sum.addAndGet(e); });
-        assertEquals(count.get(), 10);
-        assertEquals(sum.get(), 55);
+    public void testForEach() {
+        exerciseTerminalOps(countTo(10),
+                            s -> {
+                                AtomicInteger count = new AtomicInteger(0);
+                                s.forEach(e -> {
+                                    count.incrementAndGet();
+                                });
+                                return count.get();
+                            },
+                            10);
+
+        exerciseTerminalOps(countTo(10),
+                            s -> {
+                                AtomicInteger sum = new AtomicInteger(0);
+                                s.forEach(e -> {
+                                    sum.addAndGet(e);
+                                });
+                                return sum.get();
+                            },
+                            55);
     }
 
-    public void testForEachStream() {
-        List<Integer> counted = countTo(10);
-        List<Integer> eached = new ArrayList<>();
-        counted.stream().forEach(e -> { eached.add(e); });
-        assertContents(eached.iterator(), counted.iterator());
-    }
-
-    @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
+    @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
     @SuppressWarnings("unchecked")
     public void testOps(String name, StreamTestData<Integer> data) {
-       // @@@ assert contents, unordered or parallel evaluation
-       exerciseTerminalOps(data, s -> { s.forEach(bEmpty); return null; });
+        withData(data).terminal(s -> {
+            List<Integer> l = Collections.synchronizedList(new ArrayList<Integer>());
+            s.forEach(e -> {
+                l.add(e);
+            });
+            return l;
+        }).parallelEqualator(LambdaTestHelpers::equalsContentsUnordered).exercise();
+
     }
 
     // @@@ Move out into separate test class
@@ -71,7 +89,16 @@
     @Test(dataProvider = "IntStreamTestData", dataProviderClass = IntStreamTestDataProvider.class)
     @SuppressWarnings("unchecked")
     public void testIntOps(String name, IntStreamTestData data) {
-        // @@@ assert contents, unordered or parallel evaluation
-        exerciseTerminalOps(data, s -> { s.forEach(bIntEmpty); return null; });
+        withData(data).terminal(s -> {
+            IntNodeBuilder nb = IntNodes.makeBuilder(-1);
+            nb.begin(-1);
+            s.forEach(e -> {
+                synchronized (nb) {
+                    nb.accept(e);
+                }
+            });
+            nb.end();
+            return nb.build();
+        }).parallelEqualator(LambdaTestHelpers::equalsContentsUnordered).exercise();
     }
 }
--- a/test-ng/tests/org/openjdk/tests/java/util/stream/op/GroupByOpTest.java	Mon Nov 19 17:56:46 2012 -0500
+++ b/test-ng/tests/org/openjdk/tests/java/util/stream/op/GroupByOpTest.java	Mon Nov 19 18:05:09 2012 -0500
@@ -104,7 +104,7 @@
         );
     }
 
-    @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
+    @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
     public void testOps(String name, StreamTestData<Integer> data) {
         // @@@ More things to test here:
         //     - Every value in data is present in right bucket
@@ -117,23 +117,23 @@
         }
     }
 
-    @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
+    @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
     public void testWithUnorderedOp(String name, StreamTestData<Integer> data) {
         // @@@ More things to test here:
         //     - Every value in data is present in right bucket
         //     - Total number of values equals size of data
 
         for (MapperData<Integer, ?> md : getMapperData(data)) {
-            Map<?, Collection<Integer>> result = exerciseTerminalOps(data,
-                                                             this::multiMapEquals,
-                                                             s -> s.unordered(),
-                                                             s -> s.groupBy(md.m));
+            Map<?, Collection<Integer>> result = withData(data).
+                    terminal(s -> s.unordered(), s -> s.groupBy(md.m)).
+                    parallelEqualator(this::multiMapEquals).
+                    exercise();
             assertEquals(result.keySet().size(), md.expectedSize);
         }
     }
 
     // @@@ Add back when there is a supplier parameter on Stream.groupBy
-//    @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
+//    @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
 //    public void testWithValueFactory(String name, StreamTestData<Integer> data) {
 //        Map<Integer, Collection<Integer>> miResult = exerciseOps(data, new GroupByOp<Integer, Integer>(mId, LinkedList::new));
 //
--- a/test-ng/tests/org/openjdk/tests/java/util/stream/op/MapOpTest.java	Mon Nov 19 17:56:46 2012 -0500
+++ b/test-ng/tests/org/openjdk/tests/java/util/stream/op/MapOpTest.java	Mon Nov 19 18:05:09 2012 -0500
@@ -68,7 +68,7 @@
         exerciseOps(countTo(1000), s -> s.map(e -> (Integer) (1000+e)), range(1001, 2000));
     }
 
-    @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
+    @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
     public void testOps(String name, StreamTestData<Integer> data) {
         Node<Integer> result = exerciseOps(data, s -> s.map(mId));
         assertEquals(data.size(), result.size());
--- a/test-ng/tests/org/openjdk/tests/java/util/stream/op/MatchOpTest.java	Mon Nov 19 17:56:46 2012 -0500
+++ b/test-ng/tests/org/openjdk/tests/java/util/stream/op/MatchOpTest.java	Mon Nov 19 18:05:09 2012 -0500
@@ -77,7 +77,7 @@
         assertPredicates(countTo(5), MatchKind.NONE, INTEGER_PREDICATES, false, true, false, false);
     }
 
-    @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
+    @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
     public void testOps(String name, StreamTestData<Integer> data) {
         Map<MatchKind, Function<Predicate<Integer>, Function<Stream<Integer>, Boolean>>> m = new HashMap<>();
         m.put(MatchKind.ANY, p -> s -> s.anyMatch(p));
--- a/test-ng/tests/org/openjdk/tests/java/util/stream/op/MinMaxByTest.java	Mon Nov 19 17:56:46 2012 -0500
+++ b/test-ng/tests/org/openjdk/tests/java/util/stream/op/MinMaxByTest.java	Mon Nov 19 18:05:09 2012 -0500
@@ -45,7 +45,7 @@
         assertEquals(1000, (int) countTo(1000).stream().max(Integer::compare).get());
     }
 
-    @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
+    @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
     public void testOps(String name, StreamTestData<Integer> data) {
         exerciseTerminalOps(data, s -> s.min(Integer::compare));
         exerciseTerminalOps(data, s -> s.max(Integer::compare));
--- a/test-ng/tests/org/openjdk/tests/java/util/stream/op/ReduceByOpTest.java	Mon Nov 19 17:56:46 2012 -0500
+++ b/test-ng/tests/org/openjdk/tests/java/util/stream/op/ReduceByOpTest.java	Mon Nov 19 18:05:09 2012 -0500
@@ -46,7 +46,7 @@
 @Test
 public class ReduceByOpTest extends OpTestCase {
 
-    @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
+    @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
     public void testOps(String name, StreamTestData<Integer> data) {
         Map<Boolean,Collection<Integer>> gbResult = data.stream().groupBy(Functions.forPredicate(pEven, true, false));
         Map<Boolean,Integer> result = data.stream().reduceBy(Functions.forPredicate(pEven, true, false),
--- a/test-ng/tests/org/openjdk/tests/java/util/stream/op/ReduceTest.java	Mon Nov 19 17:56:46 2012 -0500
+++ b/test-ng/tests/org/openjdk/tests/java/util/stream/op/ReduceTest.java	Mon Nov 19 18:05:09 2012 -0500
@@ -64,7 +64,7 @@
         assertEquals(2, (int) list.stream().map(mDoubler).reduce(rMin).get());
     }
 
-    @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
+    @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
     public void testOps(String name, StreamTestData<Integer> data) {
         assertEquals(0, (int) exerciseTerminalOps(data, s -> s.filter(pFalse), s -> s.fold(() -> 0, rPlus, rPlus)));
 
--- a/test-ng/tests/org/openjdk/tests/java/util/stream/op/SliceOpTest.java	Mon Nov 19 17:56:46 2012 -0500
+++ b/test-ng/tests/org/openjdk/tests/java/util/stream/op/SliceOpTest.java	Mon Nov 19 18:05:09 2012 -0500
@@ -156,7 +156,7 @@
         return Math.max(0, dataSize - skip);
     }
 
-    @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
+    @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
     public void testSkipOps(String name, StreamTestData<Integer> data) {
         List<Integer> skips = Collections.unmodifiableList(sizes(data));
 
@@ -169,7 +169,7 @@
         }
     }
 
-    @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
+    @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
     public void testSkipLimitOps(String name, StreamTestData<Integer> data) {
         List<Integer> skips = Collections.unmodifiableList(sizes(data));
         List<Integer> limits = Collections.unmodifiableList(sizes(data));
@@ -185,7 +185,7 @@
         }
     }
 
-    @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
+    @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
     public void testLimitOps(String name, StreamTestData<Integer> data) {
         List<Integer> limits = Collections.unmodifiableList(sizes(data));
 
--- a/test-ng/tests/org/openjdk/tests/java/util/stream/op/SortedOpTest.java	Mon Nov 19 17:56:46 2012 -0500
+++ b/test-ng/tests/org/openjdk/tests/java/util/stream/op/SortedOpTest.java	Mon Nov 19 18:05:09 2012 -0500
@@ -106,7 +106,7 @@
         assertSorted(SortedOp.iterator(list.iterator(), cInteger));
     }
 
-    @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
+    @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
     public void testOps(String name, StreamTestData<Integer> data) {
         SortedOp<Integer> op = new SortedOp<>(cInteger);
 
@@ -119,7 +119,7 @@
         Assert.assertEquals(result.size(), data.size());
     }
 
-    @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
+    @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
     public void testSortSort(String name, StreamTestData<Integer> data) {
         SortedOp<Integer> op = new SortedOp<>(cInteger);
 
--- a/test-ng/tests/org/openjdk/tests/java/util/stream/op/TeeOpTest.java	Mon Nov 19 17:56:46 2012 -0500
+++ b/test-ng/tests/org/openjdk/tests/java/util/stream/op/TeeOpTest.java	Mon Nov 19 18:05:09 2012 -0500
@@ -79,7 +79,7 @@
         assertCountSum(copy.iterator(), 10, 110);
     }
 
-    @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
+    @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
     public void testOps(String name, final StreamTestData<Integer> data) {
         class RecordingBlock implements Block<Integer> {
             List<Integer> list;
@@ -103,7 +103,7 @@
         final RecordingBlock b = new RecordingBlock();
 
         withData(data).
-                using(s -> s.tee(b)).
+                stream(s -> s.tee(b)).
                 before(b::before).
                 after(b::after).
                 exercise();
--- a/test-ng/tests/org/openjdk/tests/java/util/stream/op/ToArrayOpTest.java	Mon Nov 19 17:56:46 2012 -0500
+++ b/test-ng/tests/org/openjdk/tests/java/util/stream/op/ToArrayOpTest.java	Mon Nov 19 18:05:09 2012 -0500
@@ -52,56 +52,62 @@
         assertCountSum(Arrays.asList(countTo(10).stream().toArray()), 10, 55);
     }
 
-    @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
+    @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
     public void testOps(String name, StreamTestData<Integer> data) {
-        exerciseTerminalOps(data, Arrays::equals,
-                            s -> s,
-                            s -> s.toArray());
+        withData(data).
+                terminal(s -> s.toArray()).
+                equalator(Arrays::equals).
+                exercise();
     }
 
-    @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
+    @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
     public void testOpsWithMap(String name, StreamTestData<Integer> data) {
         // Retain the size of the source
         // This should kick in the parallel evaluation optimization for tasks stuffing elements into a shared array
-        Object[] objects = exerciseTerminalOps(data, Arrays::equals,
-                                               s -> s.map(i -> (Integer) (i + i)),
-                                               s -> s.toArray());
+        Object[] objects = withData(data).
+                terminal(s -> s.map(i -> (Integer) (i + i)), s -> s.toArray()).
+                equalator(Arrays::equals).
+                exercise();
         assertTrue(objects.length == data.size());
     }
 
-    @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
+    @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
     public void testOpsWithFlatMap(String name, StreamTestData<Integer> data) {
         // Double the size of the source
-        Object[] objects = exerciseTerminalOps(data, Arrays::equals,
-                                               s -> s.flatMap((Block<? super Integer> b, Integer e) -> {
-                                                   b.accept(e);
-                                                   b.accept(e);
-                                               }),
-                                               s -> s.toArray());
+
+        Object[] objects = withData(data).
+                terminal(s -> s.flatMap((Block<? super Integer> b, Integer e) -> {
+                    b.accept(e);
+                    b.accept(e);
+                }), s -> s.toArray()).
+                equalator(Arrays::equals).
+                exercise();
         assertTrue(objects.length == data.size() * 2);
     }
 
-    @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
+    @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
     public void testOpsWithSorted(String name, StreamTestData<Integer> data) {
         // Retain the size of the source
         // This should kick in the parallel evaluation optimization for tasks stuffing elements into a shared array
-        Object[] objects = exerciseTerminalOps(data, Arrays::equals,
-                                               s -> s.sorted(cInteger),
-                                               s -> s.toArray());
+        Object[] objects = withData(data).
+                terminal(s -> s.sorted(cInteger), s -> s.toArray()).
+                equalator(Arrays::equals).
+                exercise();
         assertTrue(objects.length == data.size());
     }
 
-    @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
+    @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
     public void testOpsWithCumulate(String name, StreamTestData<Integer> data) {
         // Retain the size of the source
         // This should kick in the parallel evaluation optimization for tasks stuffing elements into a shared array
-        Object[] objects = exerciseTerminalOps(data, Arrays::equals,
-                                               s -> s.cumulate(rPlus),
-                                               s -> s.toArray());
+        Object[] objects = withData(data).
+                terminal(s -> s.cumulate(rPlus), s -> s.toArray()).
+                equalator(Arrays::equals).
+                exercise();
         assertTrue(objects.length == data.size());
     }
 
-    @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
+    @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
     public void testWithNode(String name, StreamTestData<Integer> data) {
         List<Integer> l = new ArrayList<>();
         for (Integer i : data) {
--- a/test-ng/tests/org/openjdk/tests/java/util/stream/op/UniqOpTest.java	Mon Nov 19 17:56:46 2012 -0500
+++ b/test-ng/tests/org/openjdk/tests/java/util/stream/op/UniqOpTest.java	Mon Nov 19 18:05:09 2012 -0500
@@ -31,7 +31,6 @@
 import org.testng.annotations.Test;
 
 import java.util.stream.ParallelPipelineHelper;
-import java.util.stream.Stream;
 import java.util.stream.StreamOpFlags;
 import java.util.stream.op.*;
 
@@ -52,7 +51,7 @@
         assertCountSum(countTo(10).stream().uniqueElements(), 10, 55);
     }
 
-    @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
+    @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
     public void testOp(String name, StreamTestData<Integer> data) {
         Node<Integer> result = exerciseOps(data, s -> s.uniqueElements());
 
@@ -65,10 +64,10 @@
     }
 
 
-    @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
+    @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
     public void testOpUnordered(String name, StreamTestData<Integer> data) {
         Node<Integer> result = withData(data).
-                using(s -> s.unordered().uniqueElements()).
+                stream(s -> s.unordered().uniqueElements()).
                 parallelEqualator(LambdaTestHelpers::equalsContentsUnordered).
                 exercise();
 
@@ -80,21 +79,21 @@
         assertTrue(result.size() <= data.size());
     }
 
-    @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
+    @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
     public void testUniqUniq(String name, StreamTestData<Integer> data) {
         withData(data).ops().
                 using(UniqOp.singleton(), UniqOp.singleton(), new TestParallelSizedOp<>()).
                 exercise();
     }
 
-    @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
+    @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
     public void testUniqSort(String name, StreamTestData<Integer> data) {
         withData(data).ops().
                 using(UniqOp.singleton(), new SortedOp<>(), new TestParallelSizedOp<>()).
                 exercise();
     }
 
-    @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
+    @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
     public void testSortUniq(String name, StreamTestData<Integer> data) {
         withData(data).ops().
                 using(new SortedOp<>(), UniqOp.singleton(), new TestParallelSizedOp<>()).
--- a/test-ng/tests/org/openjdk/tests/java/util/stream/primitive/IntStreamTestDataProvider.java	Mon Nov 19 17:56:46 2012 -0500
+++ b/test-ng/tests/org/openjdk/tests/java/util/stream/primitive/IntStreamTestDataProvider.java	Mon Nov 19 18:05:09 2012 -0500
@@ -91,9 +91,9 @@
         return new Object[] { description, m.combine(description, data) };
     }
 
-    // Return an array of ( String name, IntTestData<Integer> )
+    // Return an array of ( String name, IntStreamTestData<Integer> )
     @DataProvider(name = "IntStreamTestData")
-    public static Object[][] makeValueTestData() {
+    public static Object[][] makeIntStreamTestData() {
         return testData;
     }
 }