changeset 6076:9ee3890658f9

Add encounter order flag to stream accessors and operations. Implement GroupOp.evaluateParallel, which currently does not preserve encounter order.
author psandoz
date Mon, 15 Oct 2012 15:20:35 -0700
parents 6e0ca4cca828
children c6f42702e95f
files src/share/classes/java/util/LinkedHashMap.java src/share/classes/java/util/LinkedHashSet.java src/share/classes/java/util/List.java src/share/classes/java/util/Set.java src/share/classes/java/util/SortedMap.java src/share/classes/java/util/SortedSet.java src/share/classes/java/util/streams/AbstractPipeline.java src/share/classes/java/util/streams/Spliterator.java src/share/classes/java/util/streams/Stream.java src/share/classes/java/util/streams/StreamAccessor.java src/share/classes/java/util/streams/Streams.java src/share/classes/java/util/streams/ops/FlatMapOp.java src/share/classes/java/util/streams/ops/GroupByOp.java src/share/classes/java/util/streams/ops/SortedOp.java src/share/classes/java/util/streams/ops/ToArrayOp.java src/share/classes/java/util/streams/ops/TreeUtils.java
diffstat 16 files changed, 186 insertions(+), 72 deletions(-) [+]
line wrap: on
line diff
--- a/src/share/classes/java/util/LinkedHashMap.java	Mon Oct 15 15:06:03 2012 -0700
+++ b/src/share/classes/java/util/LinkedHashMap.java	Mon Oct 15 15:20:35 2012 -0700
@@ -24,7 +24,9 @@
  */
 
 package java.util;
-import java.io.*;
+import java.util.streams.MapStream;
+import java.util.streams.Stream;
+import java.util.streams.Streams;
 
 /**
  * <p>Hash table and linked list implementation of the <tt>Map</tt> interface,
@@ -489,4 +491,9 @@
     protected boolean removeEldestEntry(Map.Entry<K,V> eldest) {
         return false;
     }
+
+    @Override
+    public MapStream<K,V> stream() {
+        return Streams.stream(this, Stream.FLAG_ORDERED);
+    }
 }
--- a/src/share/classes/java/util/LinkedHashSet.java	Mon Oct 15 15:06:03 2012 -0700
+++ b/src/share/classes/java/util/LinkedHashSet.java	Mon Oct 15 15:20:35 2012 -0700
@@ -25,6 +25,9 @@
 
 package java.util;
 
+import java.util.streams.Stream;
+import java.util.streams.Streams;
+
 /**
  * <p>Hash table and linked list implementation of the <tt>Set</tt> interface,
  * with predictable iteration order.  This implementation differs from
@@ -168,4 +171,9 @@
         super(Math.max(2*c.size(), 11), .75f, true);
         addAll(c);
     }
+
+    @Override
+    public Stream<E> stream() {
+        return Streams.stream(this, Stream.FLAG_DISTINCT | Stream.FLAG_ORDERED);
+    }
 }
--- a/src/share/classes/java/util/List.java	Mon Oct 15 15:06:03 2012 -0700
+++ b/src/share/classes/java/util/List.java	Mon Oct 15 15:20:35 2012 -0700
@@ -25,7 +25,8 @@
 
 package java.util;
 
-import java.util.functions.Predicate;
+import java.util.streams.Stream;
+import java.util.streams.Streams;
 
 /**
  * An ordered collection (also known as a <i>sequence</i>).  The user of this
@@ -612,5 +613,9 @@
         Collections.<E>sort(this,c);
     }
 
-
+    @Override
+    Stream<E> stream() default {
+        // @@@ If instance of RandomAccess, then can choose optimial implementation RandomAccessListStreamAccessor
+        return Streams.stream(this, Stream.FLAG_ORDERED);
+    }
 }
--- a/src/share/classes/java/util/Set.java	Mon Oct 15 15:06:03 2012 -0700
+++ b/src/share/classes/java/util/Set.java	Mon Oct 15 15:20:35 2012 -0700
@@ -25,6 +25,9 @@
 
 package java.util;
 
+import java.util.streams.Stream;
+import java.util.streams.Streams;
+
 /**
  * A collection that contains no duplicate elements.  More formally, sets
  * contain no pair of elements <code>e1</code> and <code>e2</code> such that
@@ -382,4 +385,9 @@
      * @see Set#equals(Object)
      */
     int hashCode();
+
+    @Override
+    Stream<E> stream() default {
+        return Streams.stream(this, Stream.FLAG_DISTINCT);
+    }
 }
--- a/src/share/classes/java/util/SortedMap.java	Mon Oct 15 15:06:03 2012 -0700
+++ b/src/share/classes/java/util/SortedMap.java	Mon Oct 15 15:20:35 2012 -0700
@@ -25,6 +25,10 @@
 
 package java.util;
 
+import java.util.streams.MapStream;
+import java.util.streams.Stream;
+import java.util.streams.Streams;
+
 /**
  * A {@link Map} that further provides a <em>total ordering</em> on its keys.
  * The map is ordered according to the {@linkplain Comparable natural
@@ -281,4 +285,9 @@
      *         sorted in ascending key order
      */
     Set<Map.Entry<K, V>> entrySet();
+
+    @Override
+    MapStream<K,V> stream() default {
+        return Streams.stream(this, Stream.FLAG_SORTED | Stream.FLAG_ORDERED);
+    }
 }
--- a/src/share/classes/java/util/SortedSet.java	Mon Oct 15 15:06:03 2012 -0700
+++ b/src/share/classes/java/util/SortedSet.java	Mon Oct 15 15:20:35 2012 -0700
@@ -25,6 +25,9 @@
 
 package java.util;
 
+import java.util.streams.Stream;
+import java.util.streams.Streams;
+
 /**
  * A {@link Set} that further provides a <i>total ordering</i> on its elements.
  * The elements are ordered using their {@linkplain Comparable natural
@@ -219,4 +222,9 @@
      * @throws NoSuchElementException if this set is empty
      */
     E last();
+
+    @Override
+    Stream<E> stream() default {
+        return Streams.stream(this, Stream.FLAG_DISTINCT | Stream.FLAG_SORTED | Stream.FLAG_ORDERED);
+    }
 }
--- a/src/share/classes/java/util/streams/AbstractPipeline.java	Mon Oct 15 15:06:03 2012 -0700
+++ b/src/share/classes/java/util/streams/AbstractPipeline.java	Mon Oct 15 15:20:35 2012 -0700
@@ -111,7 +111,10 @@
             if (upToOp < ops.length) {
                 StatefulOp op = (StatefulOp) ops[upToOp];
                 TreeUtils.Node<?> intermediateResult = evaluateParallel(accessor, ops, fromOp, upToOp, op);
-                accessor = new Streams.SpliteratorStreamAccessor(intermediateResult.spliterator(), intermediateResult.size());
+                // @@@ Inherit other flags from pipeline e.g. the intermediate result may be sorted and/or distinct,
+                //     and is ordered
+                accessor = new Streams.SpliteratorStreamAccessor(intermediateResult.spliterator(),
+                                                                 intermediateResult.size(), Stream.FLAG_ORDERED);
 
                 fromOp = ++upToOp;
             }
--- a/src/share/classes/java/util/streams/Spliterator.java	Mon Oct 15 15:06:03 2012 -0700
+++ b/src/share/classes/java/util/streams/Spliterator.java	Mon Oct 15 15:20:35 2012 -0700
@@ -76,6 +76,9 @@
 
     /**
      * Send the remaining elements sequentially into the specified sink.
+     * <p>The implementation of this method must, before accepting elements on the sink,
+     * invoke {@link Sink#begin(int)}, and after accepting all elements, invoke
+     * {@link Sink#end()}</p>
      */
     void into(Sink<T, ?, ?> sink) default {
         sink.begin(getSizeIfKnown());
--- a/src/share/classes/java/util/streams/Stream.java	Mon Oct 15 15:06:03 2012 -0700
+++ b/src/share/classes/java/util/streams/Stream.java	Mon Oct 15 15:20:35 2012 -0700
@@ -46,26 +46,46 @@
     public static final int FLAG_DISTINCT = 1 << 0;
 
     /**
-     * Stream elements are sorted. Elements are {@code Comparable} and each
-     * element is greater or equal to the element which preceed it (if any) and
-     * less than or equal to the element which follows it (if any).
+     * Stream elements are sorted. Each element is greater or equal to the element which
+     * preceed it (if any) and less than or equal to the element which follows it (if any).
+     * <p>Implies encounter order.</p>
      */
+    // @@@ logical or with FLAG_ORDERED? works in the positive sense but does not imply in the negative sense,
+    //     for example the map operation will result in output elements that are not known to be sorted but encounter
+    //     order is preserved
     public static final int FLAG_SORTED = 1 << 1;
 
     /**
+     * Stream elements have an encounter order.
+     * <p>Certain collections have an expected order when encoutering elements in the collection while traversing,
+     * such as an array or {@link List}. That order is referred to as encounter order.
+     * Other collections may have no such order, such as a {@link HashSet},
+     * or {@link HashMap} when traversing the keys.</p>
+     * <p>Encounter order is important when the order at the input to a pipeline should be preserved at the output,
+     * for example when a list of elements is mapped to another list of elements the encouter order of the output
+     * list should correlate with the encouter order of the input list.</p>
+     * <p>Encounter order is also relevant when choosing an algorithm to process elements in parallel.
+     * If encounter order is to be preserved the parallel algorithm will need to apply associative only functions
+     * i.e. a function can be applied to any grouping of elements but the order of elements cannot change.</p>
+     * <p>Stream elements sourced from an array have an encounter order that is also a spatial order.</p>
+     * <p>An infinite stream of elements may have an encounter order.</p>
+     */
+    public static final int FLAG_ORDERED = 1 << 2;
+
+    /**
      * Stream size can be calculated in less than {@code O(n)} time.
      */
-    public static final int FLAG_SIZED = 1 << 2;
+    public static final int FLAG_SIZED = 1 << 3;
 
     /**
      * Stream size is known to be infinite. Mutually exclusive to {@link #FLAG_SIZED}.
      */
-    public static final int FLAG_INFINITE = 1 << 3;
+    public static final int FLAG_INFINITE = 1 << 4;
 
     /**
      * Mask of state bits for defined states.
      */
-    public static final int FLAG_MASK = (1 << 4) - 1;
+    public static final int FLAG_MASK = (1 << 5) - 1;
 
     /**
      * Mask of undefined state bits.
--- a/src/share/classes/java/util/streams/StreamAccessor.java	Mon Oct 15 15:06:03 2012 -0700
+++ b/src/share/classes/java/util/streams/StreamAccessor.java	Mon Oct 15 15:20:35 2012 -0700
@@ -36,6 +36,9 @@
 public interface StreamAccessor<T> {
     /**
      * Provides any remaining elements into the provided sink.
+     * <p>The implementation of this method must, before accepting elements on the sink,
+     * invoke {@link Sink#begin(int)}, and after accepting all elements, invoke
+     * {@link Sink#end()}</p>
      *
      * @param sink The sink to which elements will be provided.
      */
--- a/src/share/classes/java/util/streams/Streams.java	Mon Oct 15 15:06:03 2012 -0700
+++ b/src/share/classes/java/util/streams/Streams.java	Mon Oct 15 15:20:35 2012 -0700
@@ -41,12 +41,12 @@
 
     // MapStream
 
-    public static<K,V> MapStream<K,V> stream(SortedMap<K,V> source) {
-        return new MapPipeline<>(new MapTraversableMapStreamAccessor<>(source, true, true, source.size()));
+    public static<K,V> MapStream<K,V> stream(Map<K,V> source) {
+        return new MapPipeline<>(new MapTraversableMapStreamAccessor<>(source, source.size(), Stream.FLAG_DISTINCT));
     }
 
-    public static<K,V> MapStream<K,V> stream(Map<K,V> source) {
-        return new MapPipeline<>(new MapTraversableMapStreamAccessor<>(source, true, false, source.size()));
+    public static<K,V> MapStream<K,V> stream(Map<K,V> source, int flags) {
+        return new MapPipeline<>(new MapTraversableMapStreamAccessor<>(source, source.size(), Stream.FLAG_DISTINCT | flags));
     }
 
     public static<K,V> MapStream<K,V> stream(MapTraversable<K,V> source) {
@@ -54,45 +54,57 @@
     }
 
     public static<K,V> MapStream<K,V> stream(MapTraversable<K,V> source, int sizeOrUnknown) {
-        return new MapPipeline<>(new MapTraversableMapStreamAccessor<>(source, false, false, sizeOrUnknown));
+        return new MapPipeline<>(new MapTraversableMapStreamAccessor<>(source, sizeOrUnknown));
+    }
+
+    public static<K,V> MapStream<K,V> stream(MapTraversable<K,V> source, int sizeOrUnknown, int flags) {
+        return new MapPipeline<>(new MapTraversableMapStreamAccessor<>(source, sizeOrUnknown, flags));
     }
 
     // Stream
 
-    public static<T> Stream<T> stream(SortedSet<T> source) {
-        return new ValuePipeline<>(new TraversableStreamAccessor<>(source, true, true, source.size()));
+    public static<T> Stream<T> stream(Collection<T> source) {
+        return new ValuePipeline<>(new TraversableStreamAccessor<>(source, source.size()));
     }
 
-    public static<T> Stream<T> stream(Set<T> source) {
-        return new ValuePipeline<>(new TraversableStreamAccessor<>(source, true, false, source.size()));
-    }
-
-    public static<T> Stream<T> stream(Collection<T> source) {
-        return new ValuePipeline<>(new TraversableStreamAccessor<>(source, false, false, source.size()));
-    }
-
-    public static<T> Stream<T> stream(Traversable<T> source, int sizeOrUnknown) {
-        return new ValuePipeline<>(new TraversableStreamAccessor<>(source, false, false, sizeOrUnknown));
+    public static<T> Stream<T> stream(Collection<T> source, int flags) {
+        return new ValuePipeline<>(new TraversableStreamAccessor<>(source, source.size(), flags));
     }
 
     public static<T> Stream<T> stream(Traversable<T> source) {
         return new ValuePipeline<>(new TraversableStreamAccessor<>(source));
     }
 
-    public static<T> Stream<T> stream(Iterable<T> source, int sizeOrUnknown) {
-        return stream(source.iterator(), sizeOrUnknown);
+    public static<T> Stream<T> stream(Traversable<T> source, int sizeOrUnknown) {
+        return new ValuePipeline<>(new TraversableStreamAccessor<>(source, sizeOrUnknown));
+    }
+
+    public static<T> Stream<T> stream(Traversable<T> source, int sizeOrUnknown, int flags) {
+        return new ValuePipeline<>(new TraversableStreamAccessor<>(source, sizeOrUnknown, flags));
     }
 
     public static<T> Stream<T> stream(Iterable<T> source) {
         return stream(source.iterator());
     }
 
+    public static<T> Stream<T> stream(Iterable<T> source, int sizeOrUnknown) {
+        return stream(source.iterator(), sizeOrUnknown);
+    }
+
+    public static<T> Stream<T> stream(Iterable<T> source, int sizeOrUnknown, int flags) {
+        return stream(source.iterator(), sizeOrUnknown, flags);
+    }
+
+    public static<T> Stream<T> stream(Iterator<T> source) {
+        return new ValuePipeline<>(new IteratorStreamAccessor<>(source));
+    }
+
     public static<T> Stream<T> stream(Iterator<T> source, int sizeOrUnknown) {
         return new ValuePipeline<>(new IteratorStreamAccessor<>(source, sizeOrUnknown));
     }
 
-    public static<T> Stream<T> stream(Iterator<T> source) {
-        return new ValuePipeline<>(new IteratorStreamAccessor<>(source));
+    public static<T> Stream<T> stream(Iterator<T> source, int sizeOrUnknown, int flags) {
+        return new ValuePipeline<>(new IteratorStreamAccessor<>(source, sizeOrUnknown, flags));
     }
 
     public static <T, L extends RandomAccess & List<T>> Spliterator<T> spliterator(L source) {
@@ -144,12 +156,16 @@
         return new ValuePipeline<>(new ArrayParallelStreamAccessor<>(source, offset, length));
     }
 
+    public static<T> Stream<T> parallel(Spliterator<T> source) {
+        return new ValuePipeline<>(new SpliteratorStreamAccessor<>(source));
+    }
+
     public static<T> Stream<T> parallel(Spliterator<T> source, int sizeOrUnknown) {
         return new ValuePipeline<>(new SpliteratorStreamAccessor<>(source, sizeOrUnknown));
     }
 
-    public static<T> Stream<T> parallel(Spliterator<T> source) {
-        return new ValuePipeline<>(new SpliteratorStreamAccessor<>(source));
+    public static<T> Stream<T> parallel(Spliterator<T> source, int sizeOrUnknown, int flags) {
+        return new ValuePipeline<>(new SpliteratorStreamAccessor<>(source, sizeOrUnknown, flags));
     }
 
     // Infinite streams
@@ -175,12 +191,7 @@
 
     public static<T> Stream<T> repeat(final int n, final T t) {
         if (n < 0) {
-            final InfiniteIterator<T> iterate = new InfiniteIterator<T>() {
-                @Override
-                public T next() {
-                    return t;
-                }
-            };
+            final InfiniteIterator<T> iterate = () -> t;
 
             return new ValuePipeline<>(new InfiniteIteratorStreamAccessor<>(iterate));
         }
@@ -204,7 +215,7 @@
                 }
             };
 
-            return new ValuePipeline<>(new IteratorStreamAccessor<>(repeat));
+            return new ValuePipeline<>(new IteratorStreamAccessor<>(repeat, -1, Stream.FLAG_ORDERED));
         }
     }
 
@@ -240,7 +251,7 @@
                 }
             };
 
-            return new ValuePipeline<>(new IteratorStreamAccessor<>(repeatedly));
+            return new ValuePipeline<>(new IteratorStreamAccessor<>(repeatedly,  -1, Stream.FLAG_ORDERED));
         }
     }
 
@@ -312,15 +323,21 @@
             implements StreamAccessor.ForSequential<T>, Iterator<T> {
         private final Iterator<T> it;
         private final int sizeOrUnknown;
+        private final int flags;
 
         public IteratorStreamAccessor(Iterator<T> it) {
-            this.it = it;
-            this.sizeOrUnknown = -1;
+            this(it, -1, 0);
         }
 
         public IteratorStreamAccessor(Iterator<T> it, int sizeOrUnknown) {
+            this(it, sizeOrUnknown, 0);
+        }
+
+        public IteratorStreamAccessor(Iterator<T> it, int sizeOrUnknown, int flags) {
             this.it = it;
             this.sizeOrUnknown = sizeOrUnknown;
+            this.flags = (sizeOrUnknown >= 0 ? Stream.FLAG_SIZED : 0) |
+                   (flags & (Stream.FLAG_DISTINCT | Stream.FLAG_SORTED | Stream.FLAG_ORDERED));
         }
 
         @Override
@@ -348,7 +365,7 @@
 
         @Override
         public int getStreamFlags() {
-            return (sizeOrUnknown >= 0) ? Stream.FLAG_SIZED : 0;
+            return flags;
         }
 
         @Override
@@ -394,7 +411,8 @@
 
         @Override
         public int getStreamFlags() {
-            return Stream.FLAG_INFINITE;
+            // @@@ Should encounter order be set by default?
+            return Stream.FLAG_INFINITE | Stream.FLAG_ORDERED;
         }
 
         @Override
@@ -406,15 +424,21 @@
     static class SpliteratorStreamAccessor<T> implements StreamAccessor<T> {
         private final Spliterator<T> spliterator;
         private final int sizeOrUnknown;
+        private final int flags;
 
-        public SpliteratorStreamAccessor(Spliterator<T> spliterator) {
-            this.spliterator = spliterator;
-            this.sizeOrUnknown = -1;
+        public SpliteratorStreamAccessor(Spliterator<T> it) {
+            this(it, -1, 0);
         }
 
-        public SpliteratorStreamAccessor(Spliterator<T> spliterator, int sizeOrUnknown) {
+        public SpliteratorStreamAccessor(Spliterator<T> it, int sizeOrUnknown) {
+            this(it, sizeOrUnknown, 0);
+        }
+
+        public SpliteratorStreamAccessor(Spliterator<T> spliterator, int sizeOrUnknown, int flags) {
             this.spliterator = spliterator;
             this.sizeOrUnknown = sizeOrUnknown;
+            this.flags = (sizeOrUnknown >= 0 ? Stream.FLAG_SIZED : 0) |
+                         (flags & (Stream.FLAG_DISTINCT | Stream.FLAG_SORTED | Stream.FLAG_ORDERED));
         }
 
         @Override
@@ -424,8 +448,6 @@
 
         @Override
         public int getStreamFlags() {
-            int flags = 0;
-            flags |= (sizeOrUnknown >= 0) ? Stream.FLAG_SIZED : 0;
             return flags;
         }
 
@@ -463,14 +485,17 @@
         Iterator<T> iterator = null;
 
         TraversableStreamAccessor(Traversable<T> traversable) {
-            this(traversable, false, false, -1);
+            this(traversable, -1, 0);
         }
 
-        TraversableStreamAccessor(Traversable<T> traversable, boolean distinct, boolean sorted, int sizeOrUnknown) {
+        TraversableStreamAccessor(Traversable<T> traversable, int sizeOrUnknown) {
+            this(traversable, sizeOrUnknown, 0);
+        }
+
+        TraversableStreamAccessor(Traversable<T> traversable, int sizeOrUnknown, int flags) {
             this.traversable = traversable;
-            this.flags = (distinct ? Stream.FLAG_DISTINCT : 0 ) |
-                        (sorted ? Stream.FLAG_SORTED : 0 ) |
-                        (sizeOrUnknown >= 0 ? Stream.FLAG_SIZED : 0);
+            this.flags = (sizeOrUnknown >= 0 ? Stream.FLAG_SIZED : 0) |
+                         (flags & (Stream.FLAG_DISTINCT | Stream.FLAG_SORTED | Stream.FLAG_ORDERED));
             this.sizeOrUnknown = sizeOrUnknown;
         }
 
@@ -523,14 +548,17 @@
         MapIterator<K,V> iterator = null;
 
         MapTraversableMapStreamAccessor(MapTraversable<K,V> traversable) {
-            this(traversable, false, false, -1);
+            this(traversable, -1, 0);
         }
 
-        MapTraversableMapStreamAccessor(MapTraversable<K,V> traversable, boolean distinct, boolean sorted, int sizeOrUnknown) {
+        MapTraversableMapStreamAccessor(MapTraversable<K,V> traversable, int sizeOrUnknown) {
+            this(traversable, sizeOrUnknown, 0);
+        }
+
+        MapTraversableMapStreamAccessor(MapTraversable<K,V> traversable, int sizeOrUnknown, int flags) {
             this.traversable = traversable;
-            this.flags = (distinct ? Stream.FLAG_DISTINCT : 0 ) |
-                        (sorted ? Stream.FLAG_SORTED : 0 ) |
-                        (sizeOrUnknown >= 0 ? Stream.FLAG_SIZED : 0);
+            this.flags = (sizeOrUnknown >= 0 ? Stream.FLAG_SIZED : 0) |
+                         (flags & (Stream.FLAG_DISTINCT | Stream.FLAG_SORTED | Stream.FLAG_ORDERED));
             this.sizeOrUnknown = sizeOrUnknown;
         }
 
@@ -812,7 +840,7 @@
 
         @Override
         public int getStreamFlags() {
-            return Stream.FLAG_SIZED;
+            return Stream.FLAG_SIZED | Stream.FLAG_ORDERED;
         }
 
         @Override
@@ -855,7 +883,7 @@
 
         @Override
         public int getStreamFlags() {
-            return Stream.FLAG_SIZED;
+            return Stream.FLAG_SIZED | Stream.FLAG_ORDERED;
         }
 
         @Override
@@ -898,7 +926,7 @@
 
         @Override
         public int getStreamFlags() {
-            return Stream.FLAG_SIZED;
+            return Stream.FLAG_SIZED | Stream.FLAG_ORDERED;
         }
 
         @Override
@@ -930,7 +958,7 @@
 
         @Override
         public int getStreamFlags() {
-            return Stream.FLAG_SIZED;
+            return Stream.FLAG_SIZED | Stream.FLAG_ORDERED;
         }
 
         @Override
--- a/src/share/classes/java/util/streams/ops/FlatMapOp.java	Mon Oct 15 15:06:03 2012 -0700
+++ b/src/share/classes/java/util/streams/ops/FlatMapOp.java	Mon Oct 15 15:20:35 2012 -0700
@@ -48,7 +48,7 @@
 
     @Override
     public int getStreamFlags(int upstreamFlags) {
-        return upstreamFlags & ~(Stream.FLAG_SORTED | Stream.FLAG_DISTINCT | Stream.FLAG_SIZED | Stream.FLAG_UNKNOWN_MASK_V1);
+        return upstreamFlags & ~(Stream.FLAG_SORTED | Stream.FLAG_DISTINCT | Stream.FLAG_ORDERED | Stream.FLAG_SIZED | Stream.FLAG_UNKNOWN_MASK_V1);
     }
 
     @Override
--- a/src/share/classes/java/util/streams/ops/GroupByOp.java	Mon Oct 15 15:06:03 2012 -0700
+++ b/src/share/classes/java/util/streams/ops/GroupByOp.java	Mon Oct 15 15:20:35 2012 -0700
@@ -30,6 +30,8 @@
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.functions.Mapper;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 import java.util.streams.*;
 
 /**
@@ -42,8 +44,6 @@
  * @param <K> Type of elements in the resulting Map.
  * @author Brian Goetz
  */
-// @@@ An encounter order flag can be used to switch between two algorithms, one that preserves that order
-//     and another, currently implemented, that does not.
 public class GroupByOp<T, K> implements TerminalOp<T, Map<K, Collection<T>>> {
     private final Mapper<? super T, ? extends K> mapper;
 
@@ -89,6 +89,12 @@
 
     @Override
     public <S> Map<K, Collection<T>> evaluateParallel(ParallelPipelineHelper<S, T> helper) {
+        // @@@ If encounter order is required then a different algorithm will need to be used that preserves
+        //     the order of the grouped elements, otherwise the order does not need to be preserved
+        if ((helper.getFlags() & Stream.FLAG_ORDERED) != 0) {
+            Logger.getLogger(getClass().getName()).log(Level.WARNING, "GroupByOp.evaluateParallel does not preserve encounter order");
+        }
+
         final ConcurrentHashMap<K, StreamBuilder<T>> map = new ConcurrentHashMap<>();
 
         // Cache the sink chain, so it can be reused by all F/J leaf tasks
--- a/src/share/classes/java/util/streams/ops/SortedOp.java	Mon Oct 15 15:06:03 2012 -0700
+++ b/src/share/classes/java/util/streams/ops/SortedOp.java	Mon Oct 15 15:20:35 2012 -0700
@@ -114,6 +114,6 @@
 
     @Override
     public int getStreamFlags(int upstreamFlags) {
-        return upstreamFlags | Stream.FLAG_SORTED | Stream.FLAG_SIZED;
+        return upstreamFlags | Stream.FLAG_SORTED | Stream.FLAG_SIZED | Stream.FLAG_ORDERED;
     }
 }
--- a/src/share/classes/java/util/streams/ops/ToArrayOp.java	Mon Oct 15 15:06:03 2012 -0700
+++ b/src/share/classes/java/util/streams/ops/ToArrayOp.java	Mon Oct 15 15:20:35 2012 -0700
@@ -90,6 +90,12 @@
 
     @Override
     public <P_IN> Object[] evaluateParallel(ParallelPipelineHelper<P_IN, T> helper) {
+        // @@@ If the previous op is a stateful op then can optimize by getting direct access to the
+        //     node, if that node is flat then the array can be extracted, otherwise if the node is a tree
+        //     then that tree can be flattened, in parallel, using the ToArrayTask and then the array can be extracted
+        //     Require Spliterator.toArray with default method
+
+        // Ensure tree is flattened
         TreeUtils.Node<T> node = TreeUtils.collect(helper, false);
         @SuppressWarnings("unchecked")
         T[] array = (T[]) new Object[node.size()];
--- a/src/share/classes/java/util/streams/ops/TreeUtils.java	Mon Oct 15 15:06:03 2012 -0700
+++ b/src/share/classes/java/util/streams/ops/TreeUtils.java	Mon Oct 15 15:20:35 2012 -0700
@@ -300,7 +300,7 @@
 
             @Override
             public Stream<T> stream() {
-                return Streams.stream(data.iterator(), data.size());
+                return Streams.stream(data.iterator(), data.size(), Stream.FLAG_ORDERED);
             }
 
             @Override
@@ -367,12 +367,12 @@
 
         @Override
         public Stream<T> stream() {
-            return Streams.stream(iterator(), size());
+            return Streams.stream(iterator(), size(), Stream.FLAG_ORDERED);
         }
 
         @Override
         public Stream<T> parallel() {
-            return Streams.parallel(spliterator(), size());
+            return Streams.parallel(spliterator(), size(), Stream.FLAG_ORDERED);
         }
 
         @Override