changeset 6382:887edbc2572b

Add parallel version of findAny
author briangoetz
date Thu, 08 Nov 2012 19:12:00 -0500
parents 9de13de7fc77
children 1fca62c40fb7
files src/share/classes/java/util/streams/ops/FindAnyOp.java test-ng/tests/org/openjdk/tests/java/util/streams/ops/FindAnyOpTest.java
diffstat 2 files changed, 81 insertions(+), 2 deletions(-) [+]
line wrap: on
line diff
--- a/src/share/classes/java/util/streams/ops/FindAnyOp.java	Thu Nov 08 19:09:44 2012 -0500
+++ b/src/share/classes/java/util/streams/ops/FindAnyOp.java	Thu Nov 08 19:12:00 2012 -0500
@@ -26,7 +26,11 @@
 
 import java.util.Iterator;
 import java.util.Optional;
+import java.util.concurrent.CountedCompleter;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.streams.ParallelPipelineHelper;
 import java.util.streams.PipelineHelper;
+import java.util.streams.Spliterator;
 
 /**
  * FindAnyOp
@@ -51,8 +55,66 @@
         return evaluate(helper.iterator());
     }
 
+    @Override
+    public <P_IN> Optional<T> evaluateParallel(ParallelPipelineHelper<P_IN, T> helper) {
+        // Approach for parallel implementation:
+        // - Maintain raw result as AtomicReference<Optional<T>>
+        // - Decompose as per usual
+        // - For each chunk, pull first element; if present, complete root
+
+        return helper.invoke(new FindAnyTask<>(helper));
+    }
+
     // For testing purposes
     public Optional<T> evaluate(Iterator<T> iterator) {
         return iterator.hasNext() ? new Optional<>(iterator.next()) : Optional.<T>empty();
     }
+
+    private static class FindAnyTask<S, T> extends AbstractTask<S, T, Optional<T>, FindAnyTask<S, T>> {
+        private final AtomicReference<Optional<T>> answer;
+
+        private FindAnyTask(ParallelPipelineHelper<S, T> helper) {
+            super(helper);
+            this.answer = new AtomicReference<>(null);
+        }
+
+        private FindAnyTask(FindAnyTask<S, T> parent, Spliterator<S> spliterator) {
+            super(parent, spliterator);
+            this.answer = parent.answer;
+        }
+
+        @Override
+        protected FindAnyTask<S, T> makeChild(Spliterator<S> spliterator) {
+            return new FindAnyTask<>(this, spliterator);
+        }
+
+        @Override
+        protected Optional<T> doLeaf() {
+            Iterator<T> iterator = helper.wrapIterator(spliterator.iterator());
+            if (iterator.hasNext()) {
+                completeRoot(new Optional<>(iterator.next()));
+            }
+            return null;
+        }
+
+        @Override
+        public Optional<T> getRawResult() {
+            Optional<T> result = answer.get();
+            return result == null ? Optional.<T>empty() : result;
+        }
+
+        @Override
+        protected void setRawResult(Optional<T> result) {
+            if (result != null)
+                answer.compareAndSet(null, result);
+        }
+
+        // @@@ In CC.complete(r), should onCompletion be called before setRawResult(r) ?
+        // Below code is subtly flawed for this reason.  Currently work around this by mangling getRawResult.
+//        @Override
+//        public void onCompletion(CountedCompleter caller) {
+//            if (getParent() == null)
+//                setRawResult(Optional.<T>empty());
+//        }
+    }
 }
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/FindAnyOpTest.java	Thu Nov 08 19:09:44 2012 -0500
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/FindAnyOpTest.java	Thu Nov 08 19:12:00 2012 -0500
@@ -28,7 +28,13 @@
 import org.testng.annotations.Test;
 
 import java.util.Collections;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.functions.BiPredicate;
+import java.util.streams.ops.FilterOp;
 import java.util.streams.ops.FindAnyOp;
+import java.util.streams.ops.IntermediateOp;
 
 import static org.openjdk.tests.java.util.LambdaTestHelpers.*;
 
@@ -51,8 +57,19 @@
         assertTrue(countTo(10).stream().filter(pEven).findAny().isPresent(), "with result");
     }
 
+    public void testFindAnyParallel() {
+        assertFalse(Collections.<Integer>emptySet().parallel().findAny().isPresent(), "no result");
+        assertFalse(countTo(1000).parallel().filter(x -> x > 1000).findAny().isPresent(), "no result");
+        assertTrue(countTo(1000).parallel().filter(pEven).findAny().isPresent(), "with result");
+    }
+
     @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
     public void testOps(String name, TestData<Integer> data) {
-        exerciseOps(data, FindAnyOp.<Integer>singleton());
-    };
+        // @@@ Weak test -- only tests that all versions either find an answer or don't, don't assert validity of answer
+        // Would be good to test that the result is actually a member of the stream
+        BiPredicate<Optional<Integer>, Optional<Integer>> validAnswer = (a, b) -> a.isPresent() == b.isPresent();
+        exerciseOps(data, validAnswer, FindAnyOp.<Integer>singleton());
+        exerciseOps(data, validAnswer, FindAnyOp.<Integer>singleton(), new FilterOp<>(pTrue));
+        exerciseOps(data, validAnswer, FindAnyOp.<Integer>singleton(), new FilterOp<>(pEven));
+    }
 }