changeset 6413:3832536fbed9

(weak) parallel version of SortedOp
author briangoetz
date Thu, 15 Nov 2012 14:27:37 -0500
parents ef5e16bf1045
children 16ebde8b1e17
files src/share/classes/java/util/streams/ops/SortedOp.java
diffstat 1 files changed, 57 insertions(+), 15 deletions(-) [+]
line wrap: on
line diff
--- a/src/share/classes/java/util/streams/ops/SortedOp.java	Thu Nov 15 10:55:22 2012 -0800
+++ b/src/share/classes/java/util/streams/ops/SortedOp.java	Thu Nov 15 14:27:37 2012 -0500
@@ -25,6 +25,7 @@
 package java.util.streams.ops;
 
 import java.util.*;
+import java.util.concurrent.ForkJoinUtils;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 import java.util.streams.ParallelPipelineHelper;
@@ -75,25 +76,56 @@
         if (StreamOpFlags.SORTED.isKnown(flags)) {
             return sink;
         }
+        // Optimized version if we know the size
+        // @@@ Commented out pending resolution of why size=-1 in this case
+//        else if (StreamOpFlags.SIZED.isKnown(flags)) {
+//            return new Sink.ChainedValue<T>(sink) {
+//                T[] array;
+//                int offset;
+//
+//                PriorityQueue<T> pq;
+//                @Override
+//                public void begin(int size) {
+//                    array = (T[]) new Object[size];
+//                }
+//
+//                @Override
+//                public void end() {
+//                    Arrays.sort(array, comparator);
+//                    downstream.begin(array.length);
+//                    for (T t : array)
+//                        downstream.apply(t);
+//                    downstream.end();
+//                    array = null;
+//                }
+//
+//                @Override
+//                public void apply(T t) {
+//                    array[offset++] = t;
+//                }
+//            };
+//        }
         else {
             return new Sink.ChainedValue<T>(sink) {
-                PriorityQueue<T> pq;
+                ArrayList<T> list;
+
                 @Override
                 public void begin(int size) {
-                    pq = new PriorityQueue<>(size > 0 ? size : DEFAULT_PRIORITY_QUEUE_SIZE, comparator);
+                    list = new ArrayList<>();
                 }
 
                 @Override
                 public void end() {
-                    downstream.begin(pq.size());
-                    while (!pq.isEmpty())
-                        downstream.apply(pq.remove());
+                    list.sort(comparator);
+                    downstream.begin(list.size());
+                    list.forEach(e -> downstream.apply(e));
                     downstream.end();
+                    list = null;
                 }
 
                 @Override
                 public void apply(T t) {
-                    pq.add(t);
+                    list.add(t);
                 }
             };
         }
@@ -113,19 +145,28 @@
     public static <T> Iterator<T> iterator(Iterator<? extends T> iterator, Comparator<? super T> comparator) {
         Objects.requireNonNull(iterator);
         Objects.requireNonNull(comparator);
-        final PriorityQueue<T> pq = new PriorityQueue<>(DEFAULT_PRIORITY_QUEUE_SIZE, comparator);
-        while (iterator.hasNext()) {
-            pq.add(iterator.next());
-        }
+
         return new Iterator<T>() {
+            Iterator<T> sortedIterator = null;
+
             @Override
             public boolean hasNext() {
-                return !pq.isEmpty();
+                if (sortedIterator == null) {
+                    List<T> list = new ArrayList<>();
+                    while (iterator.hasNext())
+                        list.add(iterator.next());
+                    list.sort(comparator);
+                    sortedIterator = list.iterator();
+                }
+                return sortedIterator.hasNext();
             }
 
             @Override
             public T next() {
-                return pq.remove();
+                if (hasNext())
+                    return sortedIterator.next();
+                else
+                    throw new NoSuchElementException();
             }
         };
     }
@@ -135,9 +176,10 @@
             return helper.collectOutput();
         }
         else {
-            // @@@ Cannot refer to StreamOp.evaluateParallel default method
-            Logger.getLogger(getClass().getName()).log(Level.WARNING, "{0} using computeParallel serial default", getClass().getSimpleName());
-            return evaluateSequential(helper);
+            // @@@ Weak two-pass parallel implementation; parallel collect, parallel sort
+            T[] flattenedData = helper.collectOutput().flatten().asArray();
+            ForkJoinUtils.parallelSort(flattenedData, comparator);
+            return Nodes.node(flattenedData);
         }
     }