changeset 6077:c6f42702e95f

Implement UniqOp.evaluateParallel, which currently does not preserve encounter order. Extenstions to testing infrastructure.
author psandoz
date Mon, 15 Oct 2012 15:31:14 -0700
parents 9ee3890658f9
children 121debc3d4be
files src/share/classes/java/util/streams/ops/UniqOp.java test-ng/tests/org/openjdk/tests/java/util/LambdaTestHelpers.java test-ng/tests/org/openjdk/tests/java/util/streams/ops/StreamOpTestCase.java test-ng/tests/org/openjdk/tests/java/util/streams/ops/UniqOpTest.java
diffstat 4 files changed, 147 insertions(+), 46 deletions(-) [+]
line wrap: on
line diff
--- a/src/share/classes/java/util/streams/ops/UniqOp.java	Mon Oct 15 15:20:35 2012 -0700
+++ b/src/share/classes/java/util/streams/ops/UniqOp.java	Mon Oct 15 15:31:14 2012 -0700
@@ -25,8 +25,11 @@
 package java.util.streams.ops;
 
 import java.util.*;
-import java.util.streams.Sink;
-import java.util.streams.Stream;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountedCompleter;
+import java.util.streams.*;
+
+import static java.util.streams.ops.TreeUtils.node;
 
 /**
  * An stateful operation which eliminates duplicates from the stream.
@@ -117,8 +120,54 @@
         };
     }
 
-//    @Override
-//    public <V> ParallelStreamable<T> computeParallel(ParallelStreamable<V> source, ParallelOpHelper<T, V> helper) {
-//        throw new UnsupportedOperationException("nyi");
-//    }
+    @Override
+    public <S> TreeUtils.Node<T> evaluateParallel(ParallelPipelineHelper<S, T> helper) {
+        final ConcurrentHashMap<T, Boolean> map = new ConcurrentHashMap();
+
+        // Cache the sink chain, so it can be reused by all F/J leaf tasks
+        Sink<S, ?, ?> sinkChain = helper.wrapSink(new Sink.OfValue<T>() {
+            @Override
+            public void accept(T t) {
+                map.putIfAbsent(t, Boolean.TRUE);
+            }
+        });
+
+        UniqTask<S, T> task = new UniqTask<>(helper, sinkChain);
+        helper.invoke(task);
+
+        // @@@ Not very efficient
+        return node((T[])map.keySet().toArray());
+    }
+
+    // @@@ This is a lot of boiler plate for "spliterator.into(sinkChain)"
+    //     consider abstracting to a static method on AbstractTask that takes a Block<Void>
+    private static class UniqTask<S, T> extends AbstractTask<S, Void, UniqTask<S, T>> {
+        private final ParallelPipelineHelper<S, T> helper;
+
+        private final Sink<S, ?, ?> sinkChain;
+
+        private UniqTask(ParallelPipelineHelper<S, T> helper, Sink<S, ?, ?> sinkChain) {
+            super(helper);
+            this.helper = helper;
+            this.sinkChain = sinkChain;
+        }
+
+        private UniqTask(UniqTask<S, T> parent, Spliterator<S> spliterator) {
+            super(parent, spliterator);
+            this.helper = parent.helper;
+            this.sinkChain = parent.sinkChain;
+        }
+
+        @Override
+        protected UniqTask<S, T> makeChild(Spliterator<S> spliterator) {
+            return new UniqTask<>(this, spliterator);
+        }
+
+        @Override
+        protected Void doLeaf() {
+            spliterator.into(sinkChain);
+            return null;
+        }
+    }
+
 }
--- a/test-ng/tests/org/openjdk/tests/java/util/LambdaTestHelpers.java	Mon Oct 15 15:20:35 2012 -0700
+++ b/test-ng/tests/org/openjdk/tests/java/util/LambdaTestHelpers.java	Mon Oct 15 15:31:14 2012 -0700
@@ -223,6 +223,20 @@
         assertContents(actual, Arrays.asList(expected).iterator());
     }
 
+    public static <T> boolean equalsContentsUnordered(Iterable<T> a, Iterable<T> b) {
+        Set<T> sa = new HashSet<>();
+        for (T t : a) {
+            sa.add(t);
+        }
+
+        Set<T> sb = new HashSet<>();
+        for (T t : b) {
+            sb.add(t);
+        }
+
+        return Objects.equals(sa, sb);
+    }
+
     public static<T extends Comparable<? super T>> void assertContentsUnordered(Iterable<T> actual, Iterable<T> expected) {
         ArrayList<T> one = new ArrayList<>();
         for (T t : actual)
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/StreamOpTestCase.java	Mon Oct 15 15:20:35 2012 -0700
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/StreamOpTestCase.java	Mon Oct 15 15:31:14 2012 -0700
@@ -48,18 +48,19 @@
 @Test
 public abstract class StreamOpTestCase extends Assert {
 
+    @SuppressWarnings("unchecked")
     protected static <T, U> StreamResult<U> exerciseOps(TestData<T> data, IntermediateOp... ops) {
-        return testUsingData(data).excerciseOps(ops);
+        return (StreamResult) testUsingData(data).excerciseOps(ops);
     }
 
-    protected static <T> IntermediateOpTestBuilder<T> testUsingData(TestData<T> data) {
+    protected static <T, U> IntermediateOpTestBuilder<T, U> testUsingData(TestData<T> data) {
         return new IntermediateOpTestBuilder<>(data);
     }
 
     @SuppressWarnings({"rawtypes", "unchecked"})
     public static enum IntermediateOpTest {
         // Create a sink and wrap it
-        DATA_FOR_EACH_TO_WRAPPED_SINK {
+        DATA_FOR_EACH_TO_WRAPPED_SINK(false) {
             boolean isApplicable(IntermediateOp[] ops) {
                 return !isShortCircuit(ops);
             }
@@ -73,7 +74,7 @@
         },
 
         // Create a sink and wrap it, report the size as unknown
-        DATA_FOR_EACH_TO_WRAPPED_SINK_SIZE_UNKNOWN {
+        DATA_FOR_EACH_TO_WRAPPED_SINK_SIZE_UNKNOWN(false) {
             boolean isApplicable(IntermediateOp[] ops) {
                 return !isShortCircuit(ops);
             }
@@ -87,14 +88,14 @@
         },
 
         // Wrap with SequentialPipeline.op, and iterate in push mode
-        STREAM_FOR_EACH {
+        STREAM_FOR_EACH(false) {
             <T> void run(TestData<T> data, Sink sink, IntermediateOp[] ops) {
                 stream(data.seq(ops)).forEach(sink);
             }
         },
 
         // Wrap as stream, and iterate in pull mode
-        STREAM_ITERATOR {
+        STREAM_ITERATOR(false) {
             <T> void run(TestData<T> data, Sink sink, IntermediateOp[] ops) {
                 for (Iterator<?> seqIter = data.seq(ops).iterator(); seqIter.hasNext(); )
                     sink.accept(seqIter.next());
@@ -102,7 +103,7 @@
         },
 
         // Wrap as stream, and iterate in mixed mode
-        STREAM_ITERATOR_FOR_EACH {
+        STREAM_ITERATOR_FOR_EACH(false) {
             <T> void run(TestData<T> data, Sink sink, IntermediateOp[] ops) {
                 Stream<?> stream = stream(data.seq(ops));
                 Iterator<?> iter = stream.iterator();
@@ -113,14 +114,14 @@
         },
 
         // Wrap as parallel stream + sequential
-        PAR_STREAM_SEQUENTIAL_FOR_EACH {
+        PAR_STREAM_SEQUENTIAL_FOR_EACH(true) {
             <T> void run(TestData<T> data, Sink sink, IntermediateOp[] ops) {
                 stream(data.par(ops)).sequential().forEach(sink);
             }
         },
 
         // Wrap as parallel stream + toArray
-        PAR_STREAM_TO_ARRAY {
+        PAR_STREAM_TO_ARRAY(true) {
             <T> void run(TestData<T> data, Sink sink, IntermediateOp[] ops) {
                 for (Object t : stream(data.par(ops)).toArray())
                     sink.accept(t);
@@ -128,7 +129,7 @@
         },
 
         // Wrap as parallel stream + into
-        PAR_STREAM_SEQUENTIAL_INTO {
+        PAR_STREAM_SEQUENTIAL_INTO(true) {
             <T> void run(TestData<T> data, Sink sink, IntermediateOp[] ops) {
                 ArrayList list = stream(data.par(ops)).sequential().into(new ArrayList());
                 for (Object u : list)
@@ -140,6 +141,16 @@
         // Extends testing to test whether computation happens in- or out-of-thread
         ;
 
+        private boolean isParallel;
+
+        IntermediateOpTest(boolean isParallel) {
+            this.isParallel = isParallel;
+        }
+
+        boolean isParallel() {
+            return isParallel;
+        }
+
         boolean isApplicable(IntermediateOp[] ops) {
             return true;
         }
@@ -147,100 +158,121 @@
         abstract <T> void run(TestData<T> data, Sink s, IntermediateOp[] ops);
     }
 
-    public static class IntermediateOpTestBuilder<T> {
+    public static class IntermediateOpTestBuilder<T, U> {
         final TestData<T> data;
+
         EnumSet<IntermediateOpTest> testSet = EnumSet.allOf(IntermediateOpTest.class);
+
         @SuppressWarnings("unchecked")
         Block<TestData<T>> before = LambdaTestHelpers.bEmpty;
+
         @SuppressWarnings("unchecked")
         Block<TestData<T>> after = LambdaTestHelpers.bEmpty;
 
+        BiPredicate<Iterable<U>, Iterable<U>> sequentialEqualator = Objects::equals;
+
+        BiPredicate<Iterable<U>, Iterable<U>> parallelEqualator = Objects::equals;
+
         private IntermediateOpTestBuilder(TestData<T> data) {
             this.data = Objects.requireNonNull(data);
         }
 
-        public IntermediateOpTestBuilder<T> before(Block<TestData<T>> before) {
+        public IntermediateOpTestBuilder<T, U> before(Block<TestData<T>> before) {
             this.before = Objects.requireNonNull(before);
             return this;
         }
 
-        public IntermediateOpTestBuilder<T> after(Block<TestData<T>> after) {
+        public IntermediateOpTestBuilder<T, U> after(Block<TestData<T>> after) {
             this.after = Objects.requireNonNull(after);
             return this;
         }
 
-        public IntermediateOpTestBuilder<T> without(IntermediateOpTest... tests) {
+        public IntermediateOpTestBuilder<T, U> without(IntermediateOpTest... tests) {
             return without(Arrays.asList(tests));
         }
 
-        public IntermediateOpTestBuilder<T> without(Collection<IntermediateOpTest> tests) {
+        public IntermediateOpTestBuilder<T, U> without(Collection<IntermediateOpTest> tests) {
             testSet = EnumSet.complementOf(EnumSet.copyOf(tests));
             return this;
         }
 
-        public IntermediateOpTestBuilder<T> with(IntermediateOpTest... tests) {
+        public IntermediateOpTestBuilder<T, U> with(IntermediateOpTest... tests) {
             return with(Arrays.asList(tests));
         }
 
-        public IntermediateOpTestBuilder<T> with(Collection<IntermediateOpTest> tests) {
+        public IntermediateOpTestBuilder<T, U> with(Collection<IntermediateOpTest> tests) {
             testSet = EnumSet.copyOf(tests);
             return this;
         }
 
-        public <U> StreamResult<U> excerciseOps(IntermediateOp... ops) {
+        public IntermediateOpTestBuilder<T, U>  sequentialEqualator(BiPredicate<Iterable<U>, Iterable<U>> equalator) {
+            this.sequentialEqualator = equalator;
+            return this;
+        }
+
+        public IntermediateOpTestBuilder<T, U>  parallelEqualator(BiPredicate<Iterable<U>, Iterable<U>> equalator) {
+            this.parallelEqualator = equalator;
+            return this;
+        }
+
+        public StreamResult<U> excerciseOps(IntermediateOp... ops) {
             Objects.requireNonNull(ops);
             return excerciseMultipleOpsFactory(() -> ops);
         }
 
         @SuppressWarnings("rawtypes")
-        public <U> StreamResult<U> excerciseSingleOpFactory(Factory<IntermediateOp> fop) {
+        public StreamResult<U> excerciseSingleOpFactory(Factory<IntermediateOp> fop) {
             Objects.requireNonNull(fop);
             return excerciseMultipleOpsFactory(() -> new IntermediateOp[] { fop.make() });
         }
 
         @SuppressWarnings("rawtypes")
-        public <U> StreamResult<U> excerciseMultipleOpsFactory(Factory<IntermediateOp[]> fops) {
+        public StreamResult<U> excerciseMultipleOpsFactory(Factory<IntermediateOp[]> fops) {
             Objects.requireNonNull(fops);
-            return exerciseOps(data, testSet, before, after, fops);
+            return exerciseOps(this, fops);
+        }
+
+        public BiPredicate<Iterable<U>, Iterable<U>> getEqualator(IntermediateOpTest t) {
+            return t.isParallel() ? parallelEqualator : sequentialEqualator;
         }
     }
 
     @SuppressWarnings({ "rawtypes", "unchecked" })
-    protected static <T, U> StreamResult<U> exerciseOps(TestData<T> data,
-                                                        EnumSet<IntermediateOpTest> testSet,
-                                                        Block<TestData<T>> before,
-                                                        Block<TestData<T>> after,
+    protected static <T, U> StreamResult<U> exerciseOps(IntermediateOpTestBuilder<T, U> b,
                                                         Factory<IntermediateOp[]> fops) {
-        StreamResult<U> refResult = new StreamResult<>(data.size());
+        StreamResult<U> refResult = new StreamResult<>(b.data.size());
 
         // First pass -- grab an iterator and wrap it, and call that the reference result
-        before.apply(data);
-        Iterator<U> it = (Iterator<U>) data.iterator(fops.make());
+        b.before.apply(b.data);
+        Iterator<U> it = (Iterator<U>) b.data.iterator(fops.make());
         while (it.hasNext())
             refResult.accept(it.next());
-        after.apply(data);
+        b.after.apply(b.data);
 
-        for (IntermediateOpTest test : testSet) {
+        for (IntermediateOpTest test : b.testSet) {
             IntermediateOp[] ops = fops.make();
             if (test.isApplicable(ops)) {
-                before.apply(data);
-                assertMatches(refResult, sink -> test.run(data, sink, ops), data.toString() + " " + test + " " + Arrays.toString(ops) );
-                after.apply(data);
+                b.before.apply(b.data);
+                assertMatches(refResult,
+                              b.getEqualator(test),
+                              sink -> test.run(b.data, sink, ops),
+                              b.data.toString() + " " + test + " " + Arrays.toString(ops));
+                b.after.apply(b.data);
             }
         }
 
         return refResult;
     }
 
-    protected static<T> void assertMatches(StreamResult<T> refResult, Block<Sink<T,?,?>> block) {
-        assertMatches(refResult, block, null);
-    }
-
     @SuppressWarnings("unchecked")
-    private static<T> void assertMatches(StreamResult<T> refResult, Block<Sink<T,?,?>> block, String message) {
+    private static<T> void assertMatches(StreamResult<T> refResult,
+                                         BiPredicate<Iterable<T>, Iterable<T>> equalator,
+                                         Block<Sink<T,?,?>> block,
+                                         String message) {
         StreamResult<T> newResult = new StreamResult<>(refResult.size());
         block.apply(newResult);
-        assertEquals(newResult, refResult, message);
+
+        assertTrue(equalator.test(newResult, refResult), message);
     }
 
     @SuppressWarnings("rawtypes")
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/UniqOpTest.java	Mon Oct 15 15:20:35 2012 -0700
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/UniqOpTest.java	Mon Oct 15 15:31:14 2012 -0700
@@ -24,6 +24,7 @@
  */
 package org.openjdk.tests.java.util.streams.ops;
 
+import org.openjdk.tests.java.util.LambdaTestHelpers;
 import org.openjdk.tests.java.util.streams.StreamTestDataProvider;
 import org.testng.annotations.Test;
 
@@ -57,7 +58,10 @@
     public void testOps(String name, TestData<Integer> data) {
         UniqOp<Integer> op = UniqOp.singleton();
 
-        StreamResult<Integer> result = exerciseOps(data, op);
+        StreamResult<Integer> result = StreamOpTestCase.<Integer, Integer>testUsingData(data).
+                parallelEqualator(LambdaTestHelpers::equalsContentsUnordered).
+                excerciseOps(op);
+
         assertUnique(result.iterator());
         if (data.size() > 0)
             assertTrue(result.size() > 0);
@@ -65,4 +69,6 @@
             assertTrue(result.size() == 0);
         assertTrue(result.size() <= data.size());
     }
+
+
 }