changeset 6441:2090a6765d9d

- builder for terminal tests
author psandoz
date Mon, 19 Nov 2012 21:35:36 +0100
parents 054fd74129f6
children ebfb2df4ab5b
files test-ng/tests/org/openjdk/tests/java/util/LambdaTestHelpers.java test-ng/tests/org/openjdk/tests/java/util/stream/OpTestCase.java test-ng/tests/org/openjdk/tests/java/util/stream/op/ConcatOpTest.java test-ng/tests/org/openjdk/tests/java/util/stream/op/FindAnyOpTest.java test-ng/tests/org/openjdk/tests/java/util/stream/op/ForEachOpTest.java test-ng/tests/org/openjdk/tests/java/util/stream/op/GroupByOpTest.java test-ng/tests/org/openjdk/tests/java/util/stream/op/TeeOpTest.java test-ng/tests/org/openjdk/tests/java/util/stream/op/ToArrayOpTest.java test-ng/tests/org/openjdk/tests/java/util/stream/op/UniqOpTest.java
diffstat 9 files changed, 235 insertions(+), 156 deletions(-) [+]
line wrap: on
line diff
--- a/test-ng/tests/org/openjdk/tests/java/util/LambdaTestHelpers.java	Mon Nov 19 13:38:57 2012 -0500
+++ b/test-ng/tests/org/openjdk/tests/java/util/LambdaTestHelpers.java	Mon Nov 19 21:35:36 2012 +0100
@@ -242,14 +242,10 @@
 
     public static <T> boolean equalsContentsUnordered(Iterable<T> a, Iterable<T> b) {
         Set<T> sa = new HashSet<>();
-        for (T t : a) {
-            sa.add(t);
-        }
+        a.forEach(sa::add);
 
         Set<T> sb = new HashSet<>();
-        for (T t : b) {
-            sb.add(t);
-        }
+        b.forEach(sb::add);
 
         return Objects.equals(sa, sb);
     }
--- a/test-ng/tests/org/openjdk/tests/java/util/stream/OpTestCase.java	Mon Nov 19 13:38:57 2012 -0500
+++ b/test-ng/tests/org/openjdk/tests/java/util/stream/OpTestCase.java	Mon Nov 19 21:35:36 2012 +0100
@@ -72,19 +72,19 @@
 
     public <T, U, S_IN extends BaseStream<T>, S_OUT extends BaseStream<U>> Node<U> exerciseOps(TestData<T, S_IN> data, Function<S_IN, S_OUT> m) {
         return withData(data).
-                using(m).
+                stream(m).
                 exercise();
     }
 
     public <T, U, S_OUT extends BaseStream<U>> Node<U> exerciseOps(Collection<T> data, Function<Stream<T>, S_OUT> m) {
         return withData(new StreamTestData.CollectionData<>("Collection of type " + data.getClass().getName(), data)).
-                using(m).
+                stream(m).
                 exercise();
     }
 
     public <T, U, S_OUT extends BaseStream<U>, I extends Iterable<U> & Sized> Node<U> exerciseOps(Collection<T> data, Function<Stream<T>, S_OUT> m, I expected) {
         return withData(new StreamTestData.CollectionData<>("Collection of type " + data.getClass().getName(), data)).
-                using(m).
+                stream(m).
                 expectedResult(expected).
                 exercise();
     }
@@ -106,9 +106,19 @@
             return new DataIntemediateOpBuilder<>(data);
         }
 
-        public <U, S_OUT extends BaseStream<U>> ExcerciseDataStreamBuilder<T, U, S_IN, S_OUT> using(Function<S_IN, S_OUT> m) {
+        public <U, S_OUT extends BaseStream<U>> ExcerciseDataStreamBuilder<T, U, S_IN, S_OUT> stream(Function<S_IN, S_OUT> m) {
             return new ExcerciseDataStreamBuilder<>(data, m);
         }
+
+        public <R> ExcerciseDataTerminalBuilder<T, T, R, S_IN, S_IN>
+        terminal(Function<S_IN, R> terminalF) {
+            return new ExcerciseDataTerminalBuilder<>(data, s -> s, terminalF);
+        }
+
+        public <U, R, S_OUT extends BaseStream<U>> ExcerciseDataTerminalBuilder<T, U, R, S_IN, S_OUT>
+        terminal(Function<S_IN, S_OUT> streamF, Function<S_OUT, R> terminalF) {
+            return new ExcerciseDataTerminalBuilder<>(data, streamF, terminalF);
+        }
     }
 
     @SuppressWarnings("rawtypes")
@@ -133,7 +143,7 @@
         }
     }
 
-    @SuppressWarnings("rawtypes")
+    @SuppressWarnings({"rawtypes", "unchecked"})
     public class ExcerciseDataStreamBuilder<T, U, S_IN extends BaseStream<T>, S_OUT extends BaseStream<U>> {
         final TestData<T, S_IN> data;
 
@@ -246,117 +256,159 @@
         // Build method
 
         public Node<U> exercise() {
-            return exerciseOps(this);
+            if (refResult == null) {
+                // Induce the reference result
+                before.accept(data);
+                S_OUT sOut = m.apply(data.stream());
+                refResult = ((AbstractPipeline<?, U>) sOut).collectOutput().flatten();
+                after.accept(data);
+            }
+
+            for (BaseStreamTestScenario test : testSet) {
+                before.accept(data);
+
+                NodeBuilder<U> resultBuilder = shape.makeNodeBuilder(-1);
+                resultBuilder.begin(-1);
+                test.run(data, resultBuilder, m);
+                resultBuilder.end();
+                Node<U> result = resultBuilder.build();
+
+                assertTrue(getEqualator(test).test(result, refResult),
+                           String.format("%s %s: %s != %s", data.toString(), test, refResult, result));
+
+                after.accept(data);
+            }
+
+            return refResult;
         }
     }
 
-    @SuppressWarnings({"rawtypes", "unchecked"})
-    public <T, U, S_IN extends BaseStream<T>, S_OUT extends BaseStream<U>> Node<U> exerciseOps(ExcerciseDataStreamBuilder<T, U, S_IN, S_OUT> b) {
-        Node<U> refResult;
-        if (b.refResult != null) {
-            refResult = b.refResult;
-        }
-        else {
-            // Induce the reference result
-            b.before.accept(b.data);
-            S_OUT sOut = b.m.apply(b.data.stream());
-            refResult = ((AbstractPipeline<?, U>) sOut).collectOutput().flatten();
-            b.after.accept(b.data);
-        }
-
-        for (BaseStreamTestScenario test : b.testSet) {
-            b.before.accept(b.data);
-
-            NodeBuilder<U> resultBuilder = b.shape.makeNodeBuilder(-1);
-            resultBuilder.begin(-1);
-            test.run(b.data, resultBuilder, b.m);
-            resultBuilder.end();
-            Node<U> result = resultBuilder.build();
-
-            assertTrue(b.getEqualator(test).test(result, refResult),
-                       String.format("%s %s: %s != %s", b.data.toString(), test, refResult, result));
-
-            b.after.accept(b.data);
-        }
-
-        return refResult;
-    }
-
     // Exercise terminal operations
 
-    public <T, U, R, S_IN extends BaseStream<T>> R
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    public class ExcerciseDataTerminalBuilder<T, U, R, S_IN extends BaseStream<T>, S_OUT extends BaseStream<U>> {
+        final TestData<T, S_IN> data;
+
+        final Function<S_IN, S_OUT> streamF;
+
+        final Function<S_OUT, R> terminalF;
+
+        R refResult;
+
+        BiPredicate<R, R> sequentialEqualator = Objects::equals;
+
+        BiPredicate<R, R> parallelEqualator = Objects::equals;
+
+        private ExcerciseDataTerminalBuilder(TestData<T, S_IN> data, Function<S_IN, S_OUT> streamF, Function<S_OUT, R> terminalF) {
+            this.data = data;
+
+            this.streamF = Objects.requireNonNull(streamF);
+
+            this.terminalF = Objects.requireNonNull(terminalF);
+        }
+
+        //
+
+        @SuppressWarnings("unchecked")
+        public <I extends Iterable<U> & Sized> ExcerciseDataTerminalBuilder<T, U, R, S_IN, S_OUT> expectedResult(R expectedResult) {
+            this.refResult = expectedResult;
+            return this;
+        }
+
+        public ExcerciseDataTerminalBuilder<T, U, R, S_IN, S_OUT> equalator(BiPredicate<R, R> equalator) {
+            this.sequentialEqualator = equalator;
+            this.parallelEqualator = equalator;
+            return this;
+        }
+
+        public ExcerciseDataTerminalBuilder<T, U, R, S_IN, S_OUT> sequentialEqualator(BiPredicate<R, R> equalator) {
+            this.sequentialEqualator = equalator;
+            return this;
+        }
+
+        public ExcerciseDataTerminalBuilder<T, U, R, S_IN, S_OUT> parallelEqualator(BiPredicate<R, R> equalator) {
+            this.parallelEqualator = equalator;
+            return this;
+        }
+
+        // Build method
+
+        public R exercise() {
+            AbstractPipeline<?, U> ap = (AbstractPipeline<?, U>) streamF.apply(data.stream());
+            StreamShape shape = ap.getOutputShape();
+
+            Node<U> node = ap.collectOutput();
+            if (refResult == null) {
+                // Sequentially collect the output that will be input to the terminal op
+                refResult = terminalF.apply((S_OUT) shape.stream(node));
+            }
+
+            // Single sequential using pull
+            {
+                // @@@ Using specific stream implementation
+                AbstractPipeline<?, U> source = shape.stream(node);
+                // Force pull mode
+                source = source.pipeline(new PullOnlyOp<U>(shape));
+                R result = terminalF.apply((S_OUT) source);
+                assertTrue(sequentialEqualator.test(refResult, result), String.format("Single sequential pull: %s != %s", refResult, result));
+            }
+
+            // Single parallel
+            {
+                // @@@ Using specific stream implementation
+                R result = terminalF.apply((S_OUT) shape.parallel(node));
+                assertTrue(parallelEqualator.test(refResult, result), String.format("Single parallel: %s != %s", refResult, result));
+            }
+
+            // All sequential
+            // This may push or pull depending on the terminal op implementation
+            {
+                R result = terminalF.apply(streamF.apply(data.stream()));
+                assertTrue(sequentialEqualator.test(refResult, result), String.format("All sequential: %s != %s", refResult, result));
+            }
+
+            // All sequential using pull
+            {
+                S_OUT source = streamF.apply(data.stream());
+                // Force pull mode
+                source = (S_OUT) ((AbstractPipeline) source).pipeline(new PullOnlyOp<U>(shape));
+                R result = terminalF.apply(source);
+                assertTrue(sequentialEqualator.test(refResult, result), String.format("All sequential pull: %s != %s", refResult, result));
+            }
+
+            // All parallel
+            {
+                R result = terminalF.apply(streamF.apply(data.parallel()));
+                assertTrue(parallelEqualator.test(refResult, result), String.format("All parallel: %s != %s", refResult, result));
+            }
+
+            return refResult;
+        }
+    }
+
+    public <T, R> R exerciseTerminalOps(Collection<T> data, Function<Stream<T>, R> m, R expected) {
+        return withData(new StreamTestData.CollectionData<>("Collection of type " + data.getClass().getName(), data)).
+                terminal(m).
+                expectedResult(expected).
+                exercise();
+    }
+
+    public <T, R, S_IN extends BaseStream<T>> R
     exerciseTerminalOps(TestData<T, S_IN> data,
                         Function<S_IN, R> terminalF) {
-        return exerciseTerminalOps(data,
-                                   s -> s,
-                                   terminalF);
+        return withData(data).
+                terminal(s -> s, terminalF).
+                exercise();
     }
 
     @SuppressWarnings({"rawtypes", "unchecked"})
     public <T, U, R, S_IN extends BaseStream<T>, S_OUT extends BaseStream<U>> R
     exerciseTerminalOps(TestData<T, S_IN> data,
-                        Function<S_IN, S_OUT> m,
-                        Function<S_OUT, R> terminalF) {
-        return exerciseTerminalOps(data,
-                                   (u, v) -> Objects.equals(u, v),
-                                   m,
-                                   terminalF);
-    }
-
-    @SuppressWarnings({"rawtypes", "unchecked"})
-    public <T, U, R, S_IN extends BaseStream<T>, S_OUT extends BaseStream<U>> R
-    exerciseTerminalOps(TestData<T, S_IN> data,
-                        BiPredicate<R, R> equalator,
                         Function<S_IN, S_OUT> streamF,
                         Function<S_OUT, R> terminalF) {
-
-        AbstractPipeline<?, U> ap = (AbstractPipeline<?, U>) streamF.apply(data.stream());
-        StreamShape shape = ap.getOutputShape();
-
-        // Sequentially collect the output that will be input to the terminal op
-        Node<U> node = ap.collectOutput();
-        R refResult = terminalF.apply((S_OUT) shape.stream(node));
-
-        // Single sequential using pull
-        {
-            // @@@ Using specific stream implementation
-            AbstractPipeline<?, U> source = shape.stream(node);
-            // Force pull mode
-            source = source.pipeline(new PullOnlyOp<U>(shape));
-            R result = terminalF.apply((S_OUT) source);
-            assertTrue(equalator.test(refResult, result), String.format("Single sequential pull: %s != %s", refResult, result));
-        }
-
-        // Single parallel
-        {
-            // @@@ Using specific stream implementation
-            R result = terminalF.apply((S_OUT) shape.parallel(node));
-            assertTrue(equalator.test(refResult, result), String.format("Single parallel: %s != %s", refResult, result));
-        }
-
-        // All sequential
-        // This may push or pull depending on the terminal op implementation
-        {
-            R result = terminalF.apply(streamF.apply(data.stream()));
-            assertTrue(equalator.test(refResult, result), String.format("All sequential: %s != %s", refResult, result));
-        }
-
-        // All sequential using pull
-        {
-            S_OUT source = streamF.apply(data.stream());
-            // Force pull mode
-            source = (S_OUT) ((AbstractPipeline) source).pipeline(new PullOnlyOp<U>(shape));
-            R result = terminalF.apply(source);
-            assertTrue(equalator.test(refResult, result), String.format("All sequential pull: %s != %s", refResult, result));
-        }
-
-        // All parallel
-        {
-            R result = terminalF.apply(streamF.apply(data.parallel()));
-            assertTrue(equalator.test(refResult, result), String.format("All parallel: %s != %s", refResult, result));
-        }
-
-        return refResult;
+        return withData(data).
+                terminal(streamF, terminalF).
+                exercise();
     }
 
     //
--- a/test-ng/tests/org/openjdk/tests/java/util/stream/op/ConcatOpTest.java	Mon Nov 19 13:38:57 2012 -0500
+++ b/test-ng/tests/org/openjdk/tests/java/util/stream/op/ConcatOpTest.java	Mon Nov 19 21:35:36 2012 +0100
@@ -29,11 +29,9 @@
 import org.openjdk.tests.java.util.stream.StreamTestData;
 import org.openjdk.tests.java.util.stream.StreamTestDataProvider;
 import org.testng.annotations.Test;
-import sun.jvmstat.monitor.IntegerMonitor;
 
 import java.util.*;
 import java.util.stream.Streams;
-import java.util.stream.op.ConcatOp;
 
 import static org.openjdk.tests.java.util.LambdaTestHelpers.*;
 
@@ -107,14 +105,14 @@
     @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
     public void testOpsSequential(String name, StreamTestData<Integer> data) {
         withData(data).
-                using(s -> s.concat(data.stream())).
+                stream(s -> s.concat(data.stream())).
                 exercise();
     }
 
     @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
     public void testOpsParallel(String name, StreamTestData<Integer> data) {
         withData(data).
-                using(s -> s.concat(data.parallel())).
+                stream(s -> s.concat(data.parallel())).
                 exercise();
     }
 }
--- a/test-ng/tests/org/openjdk/tests/java/util/stream/op/FindAnyOpTest.java	Mon Nov 19 13:38:57 2012 -0500
+++ b/test-ng/tests/org/openjdk/tests/java/util/stream/op/FindAnyOpTest.java	Mon Nov 19 21:35:36 2012 +0100
@@ -67,8 +67,9 @@
         // @@@ Weak test -- only tests that all versions either find an answer or don't, don't assert validity of answer
         // Would be good to test that the result is actually a member of the stream
         BiPredicate<Optional<Integer>, Optional<Integer>> validAnswer = (a, b) -> a.isPresent() == b.isPresent();
-        exerciseTerminalOps(data, validAnswer, s -> s, s -> s.findAny());
-        exerciseTerminalOps(data, validAnswer, s -> s.filter(pTrue), s -> s.findAny());
-        exerciseTerminalOps(data, validAnswer, s -> s.filter(pEven), s -> s.findAny());
+
+        withData(data).terminal(s -> s.findAny()).equalator(validAnswer).exercise();
+        withData(data).terminal(s -> s.filter(pTrue), s -> s.findAny()).equalator(validAnswer).exercise();
+        withData(data).terminal(s -> s.filter(pEven), s -> s.findAny()).equalator(validAnswer).exercise();
     }
 }
--- a/test-ng/tests/org/openjdk/tests/java/util/stream/op/ForEachOpTest.java	Mon Nov 19 13:38:57 2012 -0500
+++ b/test-ng/tests/org/openjdk/tests/java/util/stream/op/ForEachOpTest.java	Mon Nov 19 21:35:36 2012 +0100
@@ -25,10 +25,15 @@
 package org.openjdk.tests.java.util.stream.op;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Stream;
 import java.util.stream.op.ForEachOp;
+import java.util.stream.primitive.IntNodeBuilder;
+import java.util.stream.primitive.IntNodes;
 
+import org.openjdk.tests.java.util.LambdaTestHelpers;
 import org.openjdk.tests.java.util.stream.OpTestCase;
 import org.openjdk.tests.java.util.stream.StreamTestData;
 import org.openjdk.tests.java.util.stream.StreamTestDataProvider;
@@ -44,26 +49,39 @@
 @Test
 public class ForEachOpTest extends OpTestCase {
 
-    public void testForEachParallel() {
-        AtomicInteger count = new AtomicInteger(0);
-        AtomicInteger sum = new AtomicInteger(0);
-        countTo(10).parallel().forEach(e -> { count.incrementAndGet(); sum.addAndGet(e); });
-        assertEquals(count.get(), 10);
-        assertEquals(sum.get(), 55);
-    }
+    public void testForEach() {
+        exerciseTerminalOps(countTo(10),
+                            s -> {
+                                AtomicInteger count = new AtomicInteger(0);
+                                s.forEach(e -> {
+                                    count.incrementAndGet();
+                                });
+                                return count.get();
+                            },
+                            10);
 
-    public void testForEachStream() {
-        List<Integer> counted = countTo(10);
-        List<Integer> eached = new ArrayList<>();
-        counted.stream().forEach(e -> { eached.add(e); });
-        assertContents(eached.iterator(), counted.iterator());
+        exerciseTerminalOps(countTo(10),
+                            s -> {
+                                AtomicInteger sum = new AtomicInteger(0);
+                                s.forEach(e -> {
+                                    sum.addAndGet(e);
+                                });
+                                return sum.get();
+                            },
+                            55);
     }
 
     @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
     @SuppressWarnings("unchecked")
     public void testOps(String name, StreamTestData<Integer> data) {
-       // @@@ assert contents, unordered or parallel evaluation
-       exerciseTerminalOps(data, s -> { s.forEach(bEmpty); return null; });
+        withData(data).terminal(s -> {
+            List<Integer> l = Collections.synchronizedList(new ArrayList<Integer>());
+            s.forEach(e -> {
+                l.add(e);
+            });
+            return l;
+        }).parallelEqualator(LambdaTestHelpers::equalsContentsUnordered).exercise();
+
     }
 
     // @@@ Move out into separate test class
@@ -71,7 +89,16 @@
     @Test(dataProvider = "IntStreamTestData", dataProviderClass = IntStreamTestDataProvider.class)
     @SuppressWarnings("unchecked")
     public void testIntOps(String name, IntStreamTestData data) {
-        // @@@ assert contents, unordered or parallel evaluation
-        exerciseTerminalOps(data, s -> { s.forEach(bIntEmpty); return null; });
+        withData(data).terminal(s -> {
+            IntNodeBuilder nb = IntNodes.makeBuilder(-1);
+            nb.begin(-1);
+            s.forEach(e -> {
+                synchronized (nb) {
+                    nb.accept(e);
+                }
+            });
+            nb.end();
+            return nb.build();
+        }).parallelEqualator(LambdaTestHelpers::equalsContentsUnordered).exercise();
     }
 }
--- a/test-ng/tests/org/openjdk/tests/java/util/stream/op/GroupByOpTest.java	Mon Nov 19 13:38:57 2012 -0500
+++ b/test-ng/tests/org/openjdk/tests/java/util/stream/op/GroupByOpTest.java	Mon Nov 19 21:35:36 2012 +0100
@@ -124,10 +124,10 @@
         //     - Total number of values equals size of data
 
         for (MapperData<Integer, ?> md : getMapperData(data)) {
-            Map<?, Collection<Integer>> result = exerciseTerminalOps(data,
-                                                             this::multiMapEquals,
-                                                             s -> s.unordered(),
-                                                             s -> s.groupBy(md.m));
+            Map<?, Collection<Integer>> result = withData(data).
+                    terminal(s -> s.unordered(), s -> s.groupBy(md.m)).
+                    parallelEqualator(this::multiMapEquals).
+                    exercise();
             assertEquals(result.keySet().size(), md.expectedSize);
         }
     }
--- a/test-ng/tests/org/openjdk/tests/java/util/stream/op/TeeOpTest.java	Mon Nov 19 13:38:57 2012 -0500
+++ b/test-ng/tests/org/openjdk/tests/java/util/stream/op/TeeOpTest.java	Mon Nov 19 21:35:36 2012 +0100
@@ -103,7 +103,7 @@
         final RecordingBlock b = new RecordingBlock();
 
         withData(data).
-                using(s -> s.tee(b)).
+                stream(s -> s.tee(b)).
                 before(b::before).
                 after(b::after).
                 exercise();
--- a/test-ng/tests/org/openjdk/tests/java/util/stream/op/ToArrayOpTest.java	Mon Nov 19 13:38:57 2012 -0500
+++ b/test-ng/tests/org/openjdk/tests/java/util/stream/op/ToArrayOpTest.java	Mon Nov 19 21:35:36 2012 +0100
@@ -54,30 +54,34 @@
 
     @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
     public void testOps(String name, StreamTestData<Integer> data) {
-        exerciseTerminalOps(data, Arrays::equals,
-                            s -> s,
-                            s -> s.toArray());
+        withData(data).
+                terminal(s -> s.toArray()).
+                equalator(Arrays::equals).
+                exercise();
     }
 
     @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
     public void testOpsWithMap(String name, StreamTestData<Integer> data) {
         // Retain the size of the source
         // This should kick in the parallel evaluation optimization for tasks stuffing elements into a shared array
-        Object[] objects = exerciseTerminalOps(data, Arrays::equals,
-                                               s -> s.map(i -> (Integer) (i + i)),
-                                               s -> s.toArray());
+        Object[] objects = withData(data).
+                terminal(s -> s.map(i -> (Integer) (i + i)), s -> s.toArray()).
+                equalator(Arrays::equals).
+                exercise();
         assertTrue(objects.length == data.size());
     }
 
     @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
     public void testOpsWithFlatMap(String name, StreamTestData<Integer> data) {
         // Double the size of the source
-        Object[] objects = exerciseTerminalOps(data, Arrays::equals,
-                                               s -> s.flatMap((Block<? super Integer> b, Integer e) -> {
-                                                   b.accept(e);
-                                                   b.accept(e);
-                                               }),
-                                               s -> s.toArray());
+
+        Object[] objects = withData(data).
+                terminal(s -> s.flatMap((Block<? super Integer> b, Integer e) -> {
+                    b.accept(e);
+                    b.accept(e);
+                }), s -> s.toArray()).
+                equalator(Arrays::equals).
+                exercise();
         assertTrue(objects.length == data.size() * 2);
     }
 
@@ -85,9 +89,10 @@
     public void testOpsWithSorted(String name, StreamTestData<Integer> data) {
         // Retain the size of the source
         // This should kick in the parallel evaluation optimization for tasks stuffing elements into a shared array
-        Object[] objects = exerciseTerminalOps(data, Arrays::equals,
-                                               s -> s.sorted(cInteger),
-                                               s -> s.toArray());
+        Object[] objects = withData(data).
+                terminal(s -> s.sorted(cInteger), s -> s.toArray()).
+                equalator(Arrays::equals).
+                exercise();
         assertTrue(objects.length == data.size());
     }
 
@@ -95,9 +100,10 @@
     public void testOpsWithCumulate(String name, StreamTestData<Integer> data) {
         // Retain the size of the source
         // This should kick in the parallel evaluation optimization for tasks stuffing elements into a shared array
-        Object[] objects = exerciseTerminalOps(data, Arrays::equals,
-                                               s -> s.cumulate(rPlus),
-                                               s -> s.toArray());
+        Object[] objects = withData(data).
+                terminal(s -> s.cumulate(rPlus), s -> s.toArray()).
+                equalator(Arrays::equals).
+                exercise();
         assertTrue(objects.length == data.size());
     }
 
--- a/test-ng/tests/org/openjdk/tests/java/util/stream/op/UniqOpTest.java	Mon Nov 19 13:38:57 2012 -0500
+++ b/test-ng/tests/org/openjdk/tests/java/util/stream/op/UniqOpTest.java	Mon Nov 19 21:35:36 2012 +0100
@@ -31,7 +31,6 @@
 import org.testng.annotations.Test;
 
 import java.util.stream.ParallelPipelineHelper;
-import java.util.stream.Stream;
 import java.util.stream.StreamOpFlags;
 import java.util.stream.op.*;
 
@@ -68,7 +67,7 @@
     @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
     public void testOpUnordered(String name, StreamTestData<Integer> data) {
         Node<Integer> result = withData(data).
-                using(s -> s.unordered().uniqueElements()).
+                stream(s -> s.unordered().uniqueElements()).
                 parallelEqualator(LambdaTestHelpers::equalsContentsUnordered).
                 exercise();