changeset 5863:682e571b428a it2-bootstrap

Major refactor of Stream and Pipeline hierarchies: remove Stream methods from Streamable; merge Stream and ParallelStream; base all Pipeline classes off of common AbstractPipeline
author briangoetz
date Sat, 25 Aug 2012 19:03:55 -0400
parents 9994054465df
children 78975c11b889
files make/java/java/FILES_java.gmk src/share/classes/java/lang/CharSequence.java src/share/classes/java/lang/String.java src/share/classes/java/util/Arrays.java src/share/classes/java/util/Collection.java src/share/classes/java/util/CollectionHelpers.java src/share/classes/java/util/MapIterator.java src/share/classes/java/util/StringJoiner.java src/share/classes/java/util/Traversable.java src/share/classes/java/util/functions/BiSink.java src/share/classes/java/util/functions/Sink.java src/share/classes/java/util/streams/AbstractPipeline.java src/share/classes/java/util/streams/AbstractSequentialStreamAccessor.java src/share/classes/java/util/streams/LinearPipeline.java src/share/classes/java/util/streams/MapPipeline.java src/share/classes/java/util/streams/MapStream.java src/share/classes/java/util/streams/MapStreamAccessor.java src/share/classes/java/util/streams/MapStreamOps.java src/share/classes/java/util/streams/MapStreamable.java src/share/classes/java/util/streams/ParallelPipeline.java src/share/classes/java/util/streams/ParallelStream.java src/share/classes/java/util/streams/ParallelStreamOps.java src/share/classes/java/util/streams/ParallelStreamable.java src/share/classes/java/util/streams/SequentialMapPipeline.java src/share/classes/java/util/streams/SequentialPipeline.java src/share/classes/java/util/streams/SizedStreamable.java src/share/classes/java/util/streams/StatefulBiSink.java src/share/classes/java/util/streams/Stream.java src/share/classes/java/util/streams/StreamAccessor.java src/share/classes/java/util/streams/StreamBuilder.java src/share/classes/java/util/streams/StreamBuilders.java src/share/classes/java/util/streams/StreamOps.java src/share/classes/java/util/streams/Streamable.java src/share/classes/java/util/streams/Streams.java src/share/classes/java/util/streams/ops/AllMatchOp.java src/share/classes/java/util/streams/ops/AnyMatchOp.java src/share/classes/java/util/streams/ops/BiAllMatchOp.java src/share/classes/java/util/streams/ops/BiAnyMatchOp.java src/share/classes/java/util/streams/ops/BiFilterOp.java src/share/classes/java/util/streams/ops/BiMapOp.java src/share/classes/java/util/streams/ops/CumulateOp.java src/share/classes/java/util/streams/ops/EagerOp.java src/share/classes/java/util/streams/ops/ElementwiseOp.java src/share/classes/java/util/streams/ops/FilterOp.java src/share/classes/java/util/streams/ops/FindAnyOp.java src/share/classes/java/util/streams/ops/FindFirstOp.java src/share/classes/java/util/streams/ops/FlatMapOp.java src/share/classes/java/util/streams/ops/FoldOp.java src/share/classes/java/util/streams/ops/ForEachOp.java src/share/classes/java/util/streams/ops/GroupByOp.java src/share/classes/java/util/streams/ops/IdOp.java src/share/classes/java/util/streams/ops/IntermediateOp.java src/share/classes/java/util/streams/ops/MapExtractKeysOp.java src/share/classes/java/util/streams/ops/MapExtractValuesOp.java src/share/classes/java/util/streams/ops/MapFilterKeysOp.java src/share/classes/java/util/streams/ops/MapFilterValuesOp.java src/share/classes/java/util/streams/ops/MapMapValuesOp.java src/share/classes/java/util/streams/ops/MapOp.java src/share/classes/java/util/streams/ops/MapSortedOp.java src/share/classes/java/util/streams/ops/MapSwapOp.java src/share/classes/java/util/streams/ops/MappedOp.java src/share/classes/java/util/streams/ops/NoneMatchOp.java src/share/classes/java/util/streams/ops/ParallelOp.java src/share/classes/java/util/streams/ops/ShortCircuitEagerOp.java src/share/classes/java/util/streams/ops/ShortCircuitTerminalOp.java src/share/classes/java/util/streams/ops/SortedOp.java src/share/classes/java/util/streams/ops/StatefulOp.java src/share/classes/java/util/streams/ops/StatelessOp.java src/share/classes/java/util/streams/ops/TerminalOp.java src/share/classes/java/util/streams/ops/ToArrayOp.java src/share/classes/java/util/streams/ops/TreeUtils.java src/share/classes/java/util/streams/ops/UniqOp.java test-ng/build.xml test-ng/tests/org/openjdk/tests/java/lang/CharSequenceStreamTest.java test-ng/tests/org/openjdk/tests/java/lang/ThreadLocalFactoryTest.java test-ng/tests/org/openjdk/tests/java/lang/ThreadLocalTest.java test-ng/tests/org/openjdk/tests/java/util/FillableStringTest.java test-ng/tests/org/openjdk/tests/java/util/LambdaTestHelpers.java test-ng/tests/org/openjdk/tests/java/util/StringJoinerTest.java test-ng/tests/org/openjdk/tests/java/util/concurrent/AtomicReferenceTest.java test-ng/tests/org/openjdk/tests/java/util/functions/UnaryOperatorTest.java test-ng/tests/org/openjdk/tests/java/util/streams/StreamTestDataProvider.java test-ng/tests/org/openjdk/tests/java/util/streams/ops/CumulateOpTest.java test-ng/tests/org/openjdk/tests/java/util/streams/ops/FilterOpTest.java test-ng/tests/org/openjdk/tests/java/util/streams/ops/FlatMapOpTest.java test-ng/tests/org/openjdk/tests/java/util/streams/ops/GroupByOpTest.java test-ng/tests/org/openjdk/tests/java/util/streams/ops/MapOpTest.java test-ng/tests/org/openjdk/tests/java/util/streams/ops/SortedOpTest.java test-ng/tests/org/openjdk/tests/java/util/streams/ops/StreamOpTestCase.java test-ng/tests/org/openjdk/tests/java/util/streams/ops/ToArrayOpTest.java test-ng/tests/org/openjdk/tests/java/util/streams/ops/UniqOpTest.java test-ng/tests/org/openjdk/tests/javac/LambdaTranslationTest1.java test-ng/tests/org/openjdk/tests/javac/MethodReferenceTestInstanceMethod.java test-ng/tests/org/openjdk/tests/separate/SourceModel.java test-ng/tests/org/openjdk/tests/separate/TestHarness.java test-ng/tests/org/openjdk/tests/vm/FDSeparateCompilationTest.java
diffstat 95 files changed, 2578 insertions(+), 2564 deletions(-) [+]
line wrap: on
line diff
--- a/make/java/java/FILES_java.gmk	Mon Aug 20 16:10:37 2012 -0400
+++ b/make/java/java/FILES_java.gmk	Sat Aug 25 19:03:55 2012 -0400
@@ -400,7 +400,6 @@
     java/util/streams/Streams.java \
     java/util/streams/MapStream.java \
     java/util/streams/ParallelPipeline.java \
-    java/util/streams/SizedStreamable.java \
     java/util/streams/ParallelStreamable.java \
     java/util/streams/ops/MapFilterKeysOp.java \
     java/util/streams/ops/ToArrayOp.java \
--- a/src/share/classes/java/lang/CharSequence.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/src/share/classes/java/lang/CharSequence.java	Sat Aug 25 19:03:55 2012 -0400
@@ -27,8 +27,10 @@
 
 import java.util.Iterator;
 import java.util.NoSuchElementException;
+import java.util.Traversable;
 import java.util.functions.Sink;
-import java.util.streams.Streamable;
+import java.util.streams.Stream;
+import java.util.streams.Streams;
 
 /**
  * A <tt>CharSequence</tt> is a readable sequence of <code>char</code> values. This
@@ -118,8 +120,8 @@
      *
      * @return an Iterable of Character values from this sequence
      */
-    public Streamable<Character> asChars() default {
-        return new CharacterIterable(this);
+    public Stream<Character> asChars() default {
+        return Streams.stream(new CharacterTraversable(this), length());
     }
 
     /**
@@ -137,15 +139,15 @@
      *
      * @return an Iterable of Unicode code points from this sequence
      */
-    public Streamable<Integer> asCodePoints() default {
-        return new CodePointIterable(this);
+    public Stream<Integer> asCodePoints() default {
+        return Streams.stream(new CodePointTraversable(this));
     }
 }
 
-class CharacterIterable implements Streamable<Character> {
+class CharacterTraversable implements Traversable<Character> {
     final CharSequence cs;
 
-    CharacterIterable(CharSequence cs) {
+    CharacterTraversable(CharSequence cs) {
         this.cs = cs;
     }
 
@@ -173,10 +175,10 @@
     }
 }
 
-class CodePointIterable implements Streamable<Integer> {
+class CodePointTraversable implements Traversable<Integer> {
     final CharSequence cs;
 
-    CodePointIterable(CharSequence cs) {
+    CodePointTraversable(CharSequence cs) {
         this.cs = cs;
     }
 
--- a/src/share/classes/java/lang/String.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/src/share/classes/java/lang/String.java	Sat Aug 25 19:03:55 2012 -0400
@@ -37,7 +37,7 @@
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.regex.PatternSyntaxException;
-import java.util.streams.Streamable;
+import java.util.streams.Stream;
 
 /**
  * The {@code String} class represents character strings. All
@@ -2356,11 +2356,11 @@
      * the results are returned as a stream of Strings instead of
      * as an array.
      */
-    public Streamable<String> splitAsStream(String regex, int limit) {
+    public Stream<String> splitAsStream(String regex, int limit) {
         // TODO: it would be better to match and return results
         // incrementally as they are demanded instead of precomputing
         // the entire array.
-        return Arrays.asList(split(regex, limit));
+        return Arrays.asList(split(regex, limit)).stream();
     }
 
     /**
@@ -2383,7 +2383,7 @@
      *     // out is [a, b, c, ghi, jkl, mno]
      * </pre>
      */
-    public Streamable<String> splitAsStream(String regex) {
+    public Stream<String> splitAsStream(String regex) {
         return splitAsStream(regex, 0);
     }
 
--- a/src/share/classes/java/util/Arrays.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/src/share/classes/java/util/Arrays.java	Sat Aug 25 19:03:55 2012 -0400
@@ -27,7 +27,7 @@
 
 import java.lang.reflect.*;
 import java.util.functions.*;
-import java.util.streams.ParallelStreamable;
+import java.util.streams.*;
 
 /**
  * This class contains various methods for manipulating arrays (such as
@@ -3727,7 +3727,7 @@
         }
 
         ArraySpliterator(T[] array, int offset, int length) {
-            this.array = array;
+            this.array = Objects.requireNonNull(array);
             this.curIndex = offset;
             this.endIndex = offset+length;
         }
@@ -3772,34 +3772,53 @@
         }
     }
 
-    private static class ArraySplittable<T> implements ParallelStreamable<T> {
-        private final T[] array;
-        private final int offset;
-        private final int length;
-
-        ArraySplittable(T[] array) {
+    private static class ArrayParallelStreamAccessor<T>
+            extends ArraySpliterator<T>
+            implements StreamAccessor<T>, Iterator<T>, Spliterator<T> {
+        private final int size;
+
+        ArrayParallelStreamAccessor(T[] array) {
             this(array, 0, array.length);
         }
 
-        ArraySplittable(T[] array, int offset, int length) {
-            this.array = Objects.requireNonNull(array);
-            this.offset = offset;
-            this.length = length;
+        ArrayParallelStreamAccessor(T[] array, int offset, int length) {
+            super(array, offset, length);
+            this.size = length;
+        }
+
+        @Override
+        public int getStreamFlags() {
+            return Stream.STATE_SIZED;
+        }
+
+        @Override
+        public int getSizeOrEstimate() {
+            return size;
+        }
+
+        @Override
+        public boolean isParallel() {
+            return true;
+        }
+
+        @Override
+        public Stream.Shape getShape() {
+            return Stream.Shape.LINEAR;
+        }
+
+        @Override
+        public Iterator<T> iterator() {
+            return this;
         }
 
         @Override
         public Spliterator<T> spliterator() {
-            return new ArraySpliterator<>(array, offset, length);
+            return this;
         }
 
         @Override
-        public Iterator<T> iterator() {
-            return new ArrayIterator<>(array, offset, length);
-        }
-
-        @Override
-        public int size() {
-            return length;
+        public boolean isPredictableSplits() {
+            return true;
         }
     }
 
@@ -3830,31 +3849,47 @@
 
 
     public static<T> Iterator<T> iterator(T[] array) {
-        return new ArrayIterator<>(array, 0, array.length);
+        return iterator(array, 0, array.length);
     }
 
     public static<T> Iterator<T> iterator(T[] array, int offset, int length) {
         return new ArrayIterator<>(array, offset, length);
     }
 
+    public static<T> Spliterator<T> spliterator(T[] array) {
+        return spliterator(array, 0, array.length);
+    }
+
+    public static<T> Spliterator<T> spliterator(T[] array, int offset, int length) {
+        return new ArraySpliterator<>(array, offset, length);
+    }
+
     public static<T> Iterable<T> iterable(T[] array) {
-        return () -> new ArrayIterator<>(array, 0, array.length);
+        return iterable(array, 0, array.length);
     }
 
     public static<T> Iterable<T> iterable(T[] array, int offset, int length) {
         return () -> new ArrayIterator<>(array, offset, length);
     }
 
-    public static<T> ParallelStreamable<T> parallel(T[] array) {
-        return new ArraySplittable<>(array);
+    public static<T> Stream<T> stream(T[] array) {
+        return stream(array, 0, array.length);
     }
 
-    public static<T> ParallelStreamable<T> parallel(T[] array, int offset, int length) {
-        return new ArraySplittable<>(array, offset, length);
+    public static<T> Stream<T> stream(T[] array, int offset, int length) {
+        return Streams.stream(new ArrayIterator<>(array, offset, length), length);
     }
 
+    public static<T> Stream<T> parallel(T[] array) {
+        return parallel(array, 0, array.length);
+    }
+
+    public static<T> Stream<T> parallel(T[] array, int offset, int length) {
+        return Streams.parallel(new ArrayParallelStreamAccessor<>(array, offset, length));
+    }
+
     public static<T> Sink<T> sink(T[] array) {
-        return new ArraySink<>(array);
+        return sink(array, 0, array.length);
     }
 
     public static<T> Sink<T> sink(T[] array, int offset, int length) {
--- a/src/share/classes/java/util/Collection.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/src/share/classes/java/util/Collection.java	Sat Aug 25 19:03:55 2012 -0400
@@ -26,7 +26,10 @@
 package java.util;
 
 import java.util.functions.Predicate;
-import java.util.streams.SizedStreamable;
+import java.util.functions.Sink;
+import java.util.streams.Stream;
+import java.util.streams.Streamable;
+import java.util.streams.Streams;
 
 /**
  * The root interface in the <i>collection hierarchy</i>.  A collection
@@ -127,7 +130,7 @@
  * @since 1.2
  */
 
-public interface Collection<E> extends SizedStreamable<E>, Iterable<E>, Fillable<E> {
+public interface Collection<E> extends Sized, Streamable<E>, Traversable<E>, Iterable<E>, Fillable<E> {
     // Query Operations
 
     /**
@@ -457,19 +460,6 @@
     int hashCode();
 
     /**
-     * Retains all of the elements of this collection which match the provided
-     * predicate.
-     *
-     * @param filter a predicate which returns {@code true} for elements to be
-     * retained.
-     * @return {@code true} if any elements were retained.
-     */
-    // XXX potential source incompatibility with retainAll(null) now being ambiguous
-    boolean retainAll(Predicate<? super E> filter) default {
-        return CollectionHelpers.retainAll(this, filter);
-    }
-
-    /**
      * Removes all of the elements of this collection which match the provided
      * predicate.
      *
@@ -477,12 +467,37 @@
      * removed.
      * @return {@code true} if any elements were removed.
      */
-    // XXX potential source incompatibility with removeAll(null) now being ambiguous
-    boolean removeAll(Predicate<? super E> filter) default {
-        return CollectionHelpers.removeAll(this, filter);
+    boolean removeIf(Predicate<? super E> filter) default {
+        boolean removed = false;
+        Iterator<E> each = iterator();
+        while(each.hasNext()) {
+            if(filter.test(each.next())) {
+                each.remove();
+                removed = true;
+            }
+        }
+
+        return removed;
     }
 
     void addAll(Iterable<? extends E> source) default {
-        CollectionHelpers.addAll(this, source);
+        if (source instanceof Collection)
+            addAll((Collection<? extends E>) source);
+        else {
+            for (E e : source)
+                add(e);
+        }
+    }
+
+    @Override
+    Stream<E> stream() default {
+        return Streams.stream(this, size());
+    }
+
+    @Override
+    void forEach(Sink<? super E> sink) default {
+        for (E e: this) {
+            sink.accept(e);
+        }
     }
 }
--- a/src/share/classes/java/util/CollectionHelpers.java	Mon Aug 20 16:10:37 2012 -0400
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,87 +0,0 @@
-/*
- * Copyright (c) 1997, 2010, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.  Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-package java.util;
-
-import java.util.functions.Predicate;
-
-/**
- * Extension method implementations for {@code Collection}.
- */
-public final class CollectionHelpers {
-    /**
-     * Retains only elements of the provided collection that return {@code true}
-     * for the provided predicate.
-     *
-     * @param <E> type of collection elements.
-     * @param collection the collection containing the elements to be tested.
-     * @param filter predicate against which elements will be tested.
-     * @return The same collection instance as provided.
-     */
-    public static <E> boolean retainAll(Collection<E> collection, Predicate<? super E> filter ) {
-        boolean retained = false;
-        Iterator<E> each = collection.iterator();
-        while(each.hasNext()) {
-            if(!filter.test(each.next())) {
-                each.remove();
-            } else {
-                retained = true;
-            }
-
-        }
-
-        return retained;
-    }
-
-    /**
-     * Remove elements of the provided collection that return {@code true}
-     * for the provided predicate.
-     *
-     * @param <E> type of collection elements.
-     * @param collection the collection containing the elements to be tested.
-     * @param filter predicate against which elements will be tested.
-     * @return The same collection instance as provided.
-     */
-    public static <E> boolean removeAll(Collection<E> collection, Predicate<? super E> filter ) {
-        boolean removed = false;
-        Iterator<E> each = collection.iterator();
-        while(each.hasNext()) {
-            if(filter.test(each.next())) {
-                each.remove();
-                removed = true;
-            }
-        }
-
-        return removed;
-    }
-
-    public static<E> void addAll(Collection<E> collection, Iterable<? extends E> iterable) {
-        if (iterable instanceof Collection)
-            collection.addAll((Collection<? extends E>) iterable);
-        else {
-            for (E e : iterable)
-                collection.add(e);
-        }
-    }
-}
--- a/src/share/classes/java/util/MapIterator.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/src/share/classes/java/util/MapIterator.java	Sat Aug 25 19:03:55 2012 -0400
@@ -69,11 +69,11 @@
      */
     public static class IteratorAdapter<K, V> implements MapIterator<K, V> {
 
-        protected final Iterator<? super Mapping<K,V>> source;
+        protected final Iterator<? extends Mapping<K,V>> source;
 
         protected Mapping<K, V> current;
 
-        public IteratorAdapter(Iterator<? super Mapping<K, V>> source) {
+        public IteratorAdapter(Iterator<? extends Mapping<K, V>> source) {
             this.source = Objects.requireNonNull(source);
         }
 
@@ -112,7 +112,8 @@
 
         @Override
         public Mapping<K, V> next() {
-            return (current = (Mapping<K,V>) source.next());
+            current = source.next();
+            return current;
         }
     }
 }
--- a/src/share/classes/java/util/StringJoiner.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/src/share/classes/java/util/StringJoiner.java	Sat Aug 25 19:03:55 2012 -0400
@@ -24,7 +24,7 @@
  */
 package java.util;
 
-import java.util.streams.Streamable;
+import java.util.streams.Stream;
 
 /**
  * StringJoiner is used to construct a sequence of characters separated
@@ -334,7 +334,7 @@
      * @return the current value of StringJoiner
      */
     @Override
-    public Streamable<Character> asChars() {
+    public Stream<Character> asChars() {
         return (somethingAdded ? (value.toString() + suffix).asChars() : emptyOutput.asChars() );
     }
 
@@ -352,7 +352,7 @@
      * @return the current value of StringJoiner
      */
     @Override
-    public Streamable<Integer> asCodePoints() {
+    public Stream<Integer> asCodePoints() {
         return (somethingAdded ? (value.toString() + suffix).asCodePoints() : emptyOutput.asCodePoints() );
     }
 
--- a/src/share/classes/java/util/Traversable.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/src/share/classes/java/util/Traversable.java	Sat Aug 25 19:03:55 2012 -0400
@@ -40,5 +40,8 @@
      *
      * @param sink The Sink to which elements will be provided.
      */
-    public void forEach(Sink<? super T> sink);
+    public void forEach(Sink<? super T> sink) default {
+        for (T t : this)
+            sink.accept(t);
+    }
 }
--- a/src/share/classes/java/util/functions/BiSink.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/src/share/classes/java/util/functions/BiSink.java	Sat Aug 25 19:03:55 2012 -0400
@@ -25,6 +25,8 @@
 package java.util.functions;
 
 import java.util.Mapping;
+import java.util.streams.StatefulBiSink;
+import java.util.streams.StatefulSink;
 
 /**
  * BiSink
@@ -37,4 +39,32 @@
     void accept(Mapping<T,U> mapping) default {
         accept(mapping.getKey(), mapping.getValue());
     }
+
+    BiSink<T,U> tee(BiSink<? super T, ? super U> other) default {
+        return (T t, U u) -> { other.accept(t, u); accept(t, u); };
+    }
+
+    @Override
+    StatefulBiSink<T,U,Void> asStatefulSink() default {
+        if (this instanceof StatefulSink)
+            return (StatefulBiSink<T, U, Void>) this;
+        else {
+            // @@@ Compiler bug workaround - have to capture a fake outer 'this'
+            final BiSink<T,U> thiz = this;
+            return new StatefulBiSink<T, U, Void>() {
+                @Override
+                public void begin(int size) { }
+
+                @Override
+                public Void end() {
+                    return null;
+                }
+
+                @Override
+                public void accept(T t, U u) {
+                    thiz.accept(t, u);
+                }
+            };
+        }
+    }
 }
--- a/src/share/classes/java/util/functions/Sink.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/src/share/classes/java/util/functions/Sink.java	Sat Aug 25 19:03:55 2012 -0400
@@ -24,6 +24,8 @@
  */
 package java.util.functions;
 
+import java.util.streams.StatefulSink;
+
 /**
  * A receiver of elements. The counterpart to {@code Stream}.
  *
@@ -42,4 +44,27 @@
     public Sink<T> tee(Sink<? super T> other) default {
         return (T t) -> { other.accept(t); accept(t); };
     }
+
+    public StatefulSink<T, Void> asStatefulSink() default {
+        if (this instanceof StatefulSink)
+            return (StatefulSink<T, Void>) this;
+        else {
+            // @@@ Compiler bug workaround - have to capture a fake outer 'this'
+            final Sink<T> thiz = this;
+            return new StatefulSink<T, Void>() {
+                @Override
+                public void begin(int size) { }
+
+                @Override
+                public Void end() {
+                    return null;
+                }
+
+                @Override
+                public void accept(T t) {
+                    thiz.accept(t);
+                }
+            };
+        }
+    }
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/share/classes/java/util/streams/AbstractPipeline.java	Sat Aug 25 19:03:55 2012 -0400
@@ -0,0 +1,218 @@
+/*
+ * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package java.util.streams;
+
+import java.util.*;
+import java.util.concurrent.ForkJoinTask;
+import java.util.concurrent.ForkJoinUtils;
+import java.util.functions.Sink;
+import java.util.streams.ops.*;
+
+/**
+ * AbstractPipeline
+ *
+ * @author Brian Goetz
+ */
+public abstract class AbstractPipeline<T, U> {
+    protected final AbstractPipeline<?, T> upstream;
+    protected final StreamAccessor<?> source;
+    protected final IntermediateOp<T,U> op;
+    protected final int depth;
+
+    private Iterator<U> iterator;
+
+    protected AbstractPipeline(StreamAccessor<?> source) {
+        this.source = Objects.requireNonNull(source);
+        this.op = null;
+        this.upstream = null;
+        this.depth = 0;
+    }
+
+    protected AbstractPipeline(AbstractPipeline<?, T> upstream, IntermediateOp<T,U> op) {
+        this.upstream = Objects.requireNonNull(upstream);
+        this.op = Objects.requireNonNull(op);
+        this.source = upstream.source;
+        this.depth = upstream.depth + 1;
+        assert upstream.getOutputShape() == op.inputShape();
+        assert (upstream.depth == 0) ^ (upstream.op != null);
+    }
+
+    protected<V> V evaluate(TerminalOp<U, V> terminal) {
+        return source.isParallel() ? evaluateParallel(terminal) : evaluateSerial(terminal);
+    }
+
+    protected<V> V evaluateParallel(ParallelOp<U, V> terminal) {
+        IntermediateOp[] ops = new IntermediateOp[depth];
+        boolean stateful = false;
+        boolean shortCircuit = false;
+        AbstractPipeline p = this;
+        for (int i=depth-1; i >= 0; i--) {
+            stateful |= p.op.isStateful();
+            shortCircuit |= p.op.isShortCircuit();
+            ops[i] = p.op;
+            p = p.upstream;
+        }
+
+        if (!stateful)
+            return evaluateParallel(source, terminal, ops);
+        else {
+            int i=0;
+            int segmentStart = 0;
+            StreamAccessor accessor = source;
+
+            while (true) {
+                while (i < ops.length && !ops[i].isStateful())
+                    i++;
+
+                IntermediateOp[] slice = Arrays.copyOfRange(ops, segmentStart, i);
+                if (i < ops.length) {
+                    StatefulOp op = (StatefulOp) ops[i++];
+                    segmentStart = i;
+                    TreeUtils.Node intermediateResult = (TreeUtils.Node) evaluateParallel(accessor, op, slice);
+                    accessor = new Streams.SpliteratorStreamAccessor(intermediateResult.spliterator(), intermediateResult.size());
+                }
+                else {
+                    return evaluateParallel(accessor, terminal, slice);
+                }
+            }
+        }
+    }
+
+    @SuppressWarnings({ "unchecked", "raw" })
+    protected<V> V evaluateParallel(StreamAccessor source, ParallelOp<U, V> terminal, IntermediateOp[] ops) {
+        // @@@ Bake flags and short-circuit constraints into helper
+        return (V) terminal.computeParallel(makeParallelHelper(source, ops));
+    }
+
+    protected<V> V evaluateSerial(TerminalOp<U, V> terminal) {
+
+        IntermediateOp[] ops = new IntermediateOp[depth];
+        boolean intermediateShortCircuit = false;
+        AbstractPipeline p = this;
+        for (int i=depth-1; i >= 0; i--) {
+            intermediateShortCircuit |= p.op.isShortCircuit();
+            ops[i] = p.op;
+            p = p.upstream;
+        }
+
+        if (intermediateShortCircuit || terminal.isShortCircuit() || iterator != null) {
+            // Pull traversal
+            Iterator<U> chain = iterator();
+            StatefulSink<U, V> sink = terminal.sink();
+            sink.begin(-1); // @@@ supply size if known (iterator was null, chain is size-preserving, initial size known)
+            while (chain.hasNext())
+                sink.accept(chain.next());
+            return sink.end();
+        }
+        else {
+            // Push traversal
+            int nStateful = 0;
+            StatefulSink[] statefulSinks = new StatefulSink[ops.length];
+            StatefulSink<U, V> terminalSink = terminal.sink();
+            Sink sink = terminalSink;
+            terminalSink.begin(-1); // @@@ supply size if known to terminal stage
+            for (int i=ops.length-1; i >= 0; i--) {
+                sink = ops[i].sink(sink);
+                if (ops[i].isStateful()) {
+                    StatefulSink s = (StatefulSink) sink;
+                    statefulSinks[nStateful++] = s;
+                    s.begin(-1);  // @@@ supply size if known to *this* stage
+                }
+            }
+            source.forEach(sink);
+            for (int i = nStateful-1; i >= 0; i--)
+                statefulSinks[i].end();
+            return terminalSink.end();
+        }
+    }
+
+    @SuppressWarnings({ "unchecked", "raw" })
+    protected static<U,T> ParallelOp.ParallelOpHelper<U, T> makeParallelHelper(final StreamAccessor<T> source, final IntermediateOp[] ops) {
+        return new ParallelOp.ParallelOpHelper<U, T>() {
+            @Override
+            public int suggestDepth() {
+                int size = source.getSizeOrEstimate();
+                if (size < 0)
+                    size = -size;
+                return ForkJoinUtils.suggestDepth(size);
+            }
+
+            @Override
+            public Spliterator<T> spliterator() {
+                return source.spliterator();
+            }
+
+            @Override
+            public Sink<T> sink(Sink<? super U> sink) {
+                Sink<?> chain = sink;
+                for (int i = ops.length-1; i >= 0; i--) {
+                    IntermediateOp op = ops[i];
+                    chain = op.sink(chain);
+                }
+
+                return (Sink<T>) chain;
+            }
+
+            @Override
+            public Iterator iterator() {
+                Iterator it = source.iterator();
+                for (IntermediateOp op : ops)
+                    it = op.iterator(it);
+                return it;
+            }
+
+            @Override
+            public <Z> Z invoke(ForkJoinTask<Z> task) {
+                return ForkJoinUtils.defaultFJPool().invoke(task);
+            }
+        };
+    }
+
+    public Iterator<U> iterator() {
+        if (iterator == null) {
+            iterator = (Iterator) ((op == null) ? source : op.iterator(upstream.iterator()));
+        }
+        return iterator;
+    }
+
+    private Iterator<U> iterator(StreamAccessor<?> source, IntermediateOp[] stages) {
+        Iterator it = source.iterator();
+        for (IntermediateOp op : stages)
+            it = op.iterator(it);
+        return (Iterator<U>) it;
+    }
+
+    public boolean isParallel() {
+        return source.isParallel();
+    }
+
+    public Stream.Shape getInputShape() {
+        return source.getShape();
+    }
+
+    public Stream.Shape getOutputShape() {
+        return op == null ? source.getShape() : op.outputShape();
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/share/classes/java/util/streams/AbstractSequentialStreamAccessor.java	Sat Aug 25 19:03:55 2012 -0400
@@ -0,0 +1,66 @@
+/*
+ * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package java.util.streams;
+
+import java.util.Iterator;
+import java.util.Spliterator;
+import java.util.functions.Sink;
+
+/**
+ * AbstractSequentialStreamAccessor
+ *
+ * @author Brian Goetz
+ */
+public abstract class AbstractSequentialStreamAccessor<T> implements StreamAccessor<T> {
+    @Override
+    public boolean isParallel() {
+        return false;
+    }
+
+    @Override
+    public boolean isPredictableSplits() {
+        return false;
+    }
+
+    @Override
+    public Stream.Shape getShape() {
+        return Stream.Shape.LINEAR;
+    }
+
+    @Override
+    public Spliterator<T> spliterator() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int getStreamFlags() {
+        return 0;
+    }
+
+    @Override
+    public int getSizeOrEstimate() {
+        return -Integer.MAX_VALUE;
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/share/classes/java/util/streams/LinearPipeline.java	Sat Aug 25 19:03:55 2012 -0400
@@ -0,0 +1,183 @@
+/*
+ * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package java.util.streams;
+
+import java.util.Comparator;
+import java.util.Fillable;
+import java.util.Map;
+import java.util.Optional;
+import java.util.functions.*;
+import java.util.streams.ops.*;
+
+/**
+ * LinearPipeline
+ *
+ * @param <T> Type of elements in the upstream source.
+ * @param <U> Type of elements in produced by this stage.
+ *
+ * @author Brian Goetz
+ */
+public class LinearPipeline<T, U> extends AbstractPipeline<T,U> implements Stream<U>  {
+
+    public<S> LinearPipeline(StreamAccessor<S> source) {
+        super(source);
+        assert source.getShape() == Shape.LINEAR;
+    }
+
+    public LinearPipeline(AbstractPipeline<?, T> upstream, IntermediateOp<T, U> op) {
+        super(upstream, op);
+    }
+
+    <V> LinearPipeline<U, V> chainWith(IntermediateOp<U, V> op) {
+        return new LinearPipeline<>(this, op);
+    }
+
+    <V> LinearPipeline<T, V> replaceWith(IntermediateOp<T, V> op) {
+        return new LinearPipeline<>(upstream, op);
+    }
+
+    public<V> Stream<V> pipeline(IntermediateOp<U, V> op) {
+        // @@@ delegate to shape to do instantiation
+        return chainWith(op);
+    }
+
+    public<V> V pipeline(TerminalOp<U, V> op) {
+        return evaluate(op);
+    }
+
+    @Override
+    public Stream<U> filter(Predicate<? super U> predicate) {
+        return pipeline(new FilterOp<>(predicate));
+    }
+
+    @Override
+    public <R> Stream<R> map(Mapper<? super U, ? extends R> mapper) {
+        return pipeline(new MapOp<>(mapper));
+    }
+
+    @Override
+    public <R> Stream<R> flatMap(FlatMapper<? super U, R> mapper) {
+        return pipeline(new FlatMapOp<>(mapper));
+    }
+
+    @Override
+    public Stream<U> uniqueElements() {
+        return pipeline(UniqOp.<U>singleton());
+    }
+
+    @Override
+    public Stream<U> sorted(Comparator<? super U> comparator) {
+        return pipeline(new SortedOp<>(comparator));
+    }
+
+    @Override
+    public Stream<U> cumulate(BinaryOperator<U> operator) {
+        return pipeline(new CumulateOp<>(operator));
+    }
+
+    @Override
+    public void forEach(Sink<? super U> sink) {
+        pipeline(new ForEachOp<>(sink));
+    }
+
+    @Override
+    public <A extends Fillable<? super U>> A into(A target) {
+        forEach(target::add);
+        return target;
+    }
+
+    @Override
+    public <K> Map<K, Streamable<U>> groupBy(Mapper<? super U, ? extends K> mapper) {
+        return pipeline(new GroupByOp<>(mapper));
+    }
+
+    @Override
+    public Object[] toArray() {
+        return pipeline(ToArrayOp.<U>singleton());
+    }
+
+    @Override
+    public boolean anyMatch(Predicate<? super U> predicate) {
+        return pipeline(new AnyMatchOp<>(predicate));
+    }
+
+    @Override
+    public boolean allMatch(Predicate<? super U> predicate) {
+        return pipeline(new AllMatchOp<>(predicate));
+    }
+
+    @Override
+    public boolean noneMatch(Predicate<? super U> predicate) {
+        return pipeline(new NoneMatchOp<>(predicate));
+    }
+
+    @Override
+    public Optional<U> findFirst() {
+        return pipeline(FindFirstOp.<U>singleton());
+    }
+
+    @Override
+    public Optional<U> findAny() {
+        return pipeline(FindAnyOp.<U>singleton());
+    }
+
+    @Override
+    public Stream<U> sequential() {
+        if (!isParallel()) {
+            return this;
+        }
+        else {
+            TreeUtils.Node<U> collected = evaluate(TreeUtils.CollectorOp.<U>singleton());
+            return Streams.stream(collected, collected.size());
+        }
+    }
+
+    @Override
+    public <R> MapStream<U, R> mapped(Mapper<? super U, ? extends R> mapper) {
+        return new MapPipeline<>(this, new MappedOp<>(mapper));
+    }
+
+    @Override
+    public U reduce(final U seed, final BinaryOperator<U> op) {
+        // @@@ Temporary hack to work around compiler error
+        Factory<U> seedFactory = new Factory<U>() {
+            @Override
+            public U make() {
+                return seed;
+            }
+        };
+        return pipeline(new FoldOp<>(seedFactory, op, op));
+    }
+
+    @Override
+    public Optional<U> reduce(BinaryOperator<U> op) {
+        return pipeline(new SeedlessFoldOp<>(op));
+    }
+
+    @Override
+    public <V> V fold(Factory<V> seedFactory, Combiner<V, U, V> reducer, BinaryOperator<V> combiner) {
+        return pipeline(new FoldOp<>(seedFactory, reducer, combiner));
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/share/classes/java/util/streams/MapPipeline.java	Sat Aug 25 19:03:55 2012 -0400
@@ -0,0 +1,169 @@
+/*
+ * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package java.util.streams;
+
+import java.util.*;
+import java.util.functions.*;
+import java.util.streams.ops.*;
+
+/**
+ * MapPipeline
+ *
+ * @param <U> Type of elements in the upstream.
+ * @param <K> Type of mapping key.
+ * @param <V> Type of mapping value.
+ *
+ * @author Brian Goetz
+ */
+public class MapPipeline<U, K, V>
+        extends AbstractPipeline<U, Mapping<K,V>>
+        implements MapStream<K, V> {
+    /**
+     * If initialized then we are committed to pull iteration via iterator.
+     */
+    private MapIterator<K,V> iterator;
+
+    public MapPipeline(MapStreamAccessor<K, V> source) {
+        super(source);
+        assert source.getShape() == Stream.Shape.LINEAR;
+    }
+
+    public MapPipeline(AbstractPipeline<?, U> upstream, IntermediateOp<U, Mapping<K, V>> op) {
+        super(upstream, op);
+    }
+
+    <KK, VV> MapPipeline<Mapping<K, V>, KK, VV> chainWith(IntermediateOp<Mapping<K, V>, Mapping<KK, VV>> op) {
+        return new MapPipeline<>(this, op);
+    }
+
+    <KK, VV> MapPipeline<U, KK, VV> replaceWith(IntermediateOp<U, Mapping<KK, VV>> op) {
+        return new MapPipeline<>(upstream, op);
+    }
+
+    @Override
+    public MapIterator<K,V> iterator() {
+       if (iterator == null) {
+           Iterator<Mapping<K, V>> wrapped = (op == null)
+                                             ? ((MapStreamAccessor<K, V>) source)
+                                             : op.iterator(upstream.iterator());
+           iterator = (wrapped instanceof MapIterator)
+                      ? (MapIterator<K, V>) wrapped
+                      : new MapIterator.IteratorAdapter<>(wrapped);
+       }
+        return iterator;
+    }
+
+    @Override
+    public void forEach(BiSink<K, V> sink) {
+        pipeline(new ForEachOp<>(sink));
+    }
+
+    public<U> U pipeline(TerminalOp<Mapping<K,V>, U> op) {
+        return evaluate(op);
+    }
+
+    @Override
+    public <A extends Map<K, V>> A into(A target) {
+        forEach(target::put);
+        return target;
+    }
+
+    @Override
+    public Stream<K> keys() {
+        return new LinearPipeline<>(this, new MapExtractKeysOp<K,V>());
+    }
+
+    @Override
+    public Stream<V> values() {
+        return new LinearPipeline<>(this, new MapExtractValuesOp<K,V>());
+    }
+
+    @Override
+    public MapStream<K, V> filter(BiPredicate<? super K, ? super V> predicate) {
+        return chainWith(new BiFilterOp<>(predicate));
+    }
+
+    @Override
+    public MapStream<K, V> filterKeys(Predicate<? super K> predicate) {
+        return chainWith(new MapFilterKeysOp<K,V>(predicate));
+    }
+
+    @Override
+    public MapStream<K, V> filterValues(Predicate<? super V> predicate) {
+        return chainWith(new MapFilterValuesOp<K,V>(predicate));
+    }
+
+    @Override
+    public <U> MapStream<K, U> mapValues(BiMapper<? super K, ? super V, ? extends U> mapper) {
+        return chainWith(new BiMapOp<>(mapper));
+    }
+
+    @Override
+    public <U> MapStream<K, U> mapValues(Mapper<? super V, ? extends U> mapper) {
+        return chainWith(new MapMapValuesOp<K,V,U>(mapper));
+    }
+
+    @Override
+    public MapStream<V, K> swap() {
+        return chainWith(MapSwapOp.<K,V>singleton());
+    }
+
+    @Override
+    public MapStream<K, V> sorted(Comparator<? super K> comparator) {
+//        return chainWith(new MapSortedOp<K,V>(comparator));
+        return null;
+    }
+
+    @Override
+    public MapStream<K, V> mergeWith(MapStream<K, ? extends V> other) {
+        // !?! mduigou I don't know what semantic was intended.
+        throw new UnsupportedOperationException("nyi");
+    }
+
+    @Override
+    public boolean noneMatch(BiPredicate<? super K, ? super V> predicate) {
+        return allMatch(predicate.negate());
+    }
+
+    @Override
+    public boolean allMatch(BiPredicate<? super K, ? super V> predicate) {
+        return pipeline(new BiAllMatchOp<>(predicate));
+    }
+
+    @Override
+    public boolean anyMatch(BiPredicate<? super K, ? super V> predicate) {
+        return pipeline(new BiAnyMatchOp<>(predicate));
+    }
+
+    @Override
+    public Optional<Mapping<K, V>> findAny() {
+        return pipeline(FindAnyOp.<Mapping<K,V>>singleton());
+    }
+
+    @Override
+    public Optional<Mapping<K, V>> findFirst() {
+        return pipeline(FindFirstOp.<Mapping<K,V>>singleton());
+    }
+}
--- a/src/share/classes/java/util/streams/MapStream.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/src/share/classes/java/util/streams/MapStream.java	Sat Aug 25 19:03:55 2012 -0400
@@ -24,7 +24,8 @@
  */
 package java.util.streams;
 
-import java.util.MapIterator;
+import java.util.*;
+import java.util.functions.*;
 
 /**
  * A stream who's elements are key-value mappings.
@@ -34,5 +35,153 @@
  *
  * @author Brian Goetz
  */
-public interface MapStream<K, V> extends MapStreamOps<K, V>, MapIterator<K, V> {
+public interface MapStream<K, V> {
+    MapIterator<K, V> iterator();
+
+    
+    /**
+     * Return the mapping which is semantically "first" in the stream. If the
+     * stream is not ordered then repeated invocations may return a different
+     * mapping.
+     *
+     * @return the first mapping of the stream if any.
+     */
+    Optional<Mapping<K, V>> findFirst();
+
+    /**
+     * Return any mapping in the stream. Repeated invocations may return the
+     * same mapping or a different mapping.
+     *
+     * @return a mapping in the stream.
+     */
+    Optional<Mapping<K, V>> findAny();
+
+    /**
+     * Return a stream of the keys.
+     *
+     * @return a stream of the keys.
+     */
+    Stream<K> keys();
+
+    /**
+     * Return a stream of the values.
+     *
+     * @return a stream of the values.
+     */
+    Stream<V> values();
+
+    /**
+     * Return a stream filtered by the provided predicate.
+     *
+     * @param predicate the predicate used to filter elements.
+     * @return The filtered stream.
+     */
+    MapStream<K, V> filter(BiPredicate<? super K, ? super V> predicate);
+
+    /**
+     * Return a stream filtered by the provided predicate.
+     *
+     * @param predicate the predicate used to filter elements based upon keys.
+     * @return The filtered stream.
+     */
+    MapStream<K, V> filterKeys(Predicate<? super K> predicate);
+
+    /**
+     * Return a stream filtered by the provided predicate.
+     *
+     * @param predicate the predicate used to filter elements based upon values.
+     * @return The filtered stream.
+     */
+    MapStream<K, V> filterValues(Predicate<? super V> predicate);
+
+    /**
+     * Return a MapStream where the values have been mapped via the provided
+     * Mapper.
+     *
+     * @param <U> Type of mapping values in the returned stream.
+     * @param mapper the predicate used to map elements.
+     * @return The mapped stream.
+     */
+    <U> MapStream<K, U> mapValues(BiMapper<? super K, ? super V, ? extends U> mapper);
+
+    /**
+     * Return a MapStream where the values have been mapped via the provided
+     * Mapper.
+     *
+     * @param <U> Type of mapping values in the returned stream.
+     * @param mapper the predicate used to map elements.
+     * @return The mapped stream.
+     */
+    <U> MapStream<K, U> mapValues(Mapper<? super V, ? extends U> mapper);
+
+    /**
+     * Each element of this stream is processed by the provided sink.
+     *
+     * @param sink the Sink via which all elements will be processed.
+     */
+    void forEach(BiSink<K, V> sink);
+
+    // @@@ Map, or MapFillable?
+    <A extends Map<K, V>> A into(A target);
+
+    /**
+     * Any of the elements of this map stream return {@code true} for the
+     * provided predicate.
+     *
+     * @param predicate The predicate against which elements will be evaluated.
+     * @return {@code true} if any of the elements return {@code true} for the
+     * provided predicate.
+     */
+    boolean anyMatch(BiPredicate<? super K, ? super V> predicate);
+
+    /**
+     * all of the elements of this map stream return {@code true} for the
+     * provided predicate.
+     *
+     * @param predicate The predicate against which elements will be evaluated.
+     * @return {@code true} if all of the elements return {@code true} for the
+     * provided predicate.
+     */
+    boolean allMatch(BiPredicate<? super K, ? super V> predicate);
+
+    /**
+     * None of the elements of this map stream return {@code true} for the
+     * provided predicate.
+     *
+     * @param predicate The predicate against which elements will be evaluated.
+     * @return {@code true} if none of the elements return {@code true} for the
+     * provided predicate.
+     */
+    boolean noneMatch(BiPredicate<? super K, ? super V> predicate);
+
+    /**
+     * Returns a stream composed of the elements o
+     *
+     * ?!? mduigou Intended semantic for handling duplicates? Is concatenation a
+     * valid implementation?
+     *
+     * @param other
+     * @return
+     */
+    MapStream<K, V> mergeWith(MapStream<K, ? extends V> other);
+
+    /**
+     * Returns a map stream containing the same elements sorted in the order
+     * specified by the provided Comparator.
+     *
+     * @param comparator used to evaluate elements to determine their sorted
+     * order.
+     * @return a map stream containing the same elements sorted in the order
+     * specified by the provided Comparator.
+     */
+    MapStream<K, V> sorted(Comparator<? super K> comparator);
+
+    /**
+     * Return a map stream which contains the same elements as this stream but
+     * the keys and values have been swapped.
+     *
+     * @return A map stream which contains the same elements as this stream but
+     * the keys and values have been swapped.
+     */
+    MapStream<V, K> swap();
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/share/classes/java/util/streams/MapStreamAccessor.java	Sat Aug 25 19:03:55 2012 -0400
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package java.util.streams;
+
+import java.util.MapIterator;
+import java.util.Mapping;
+import java.util.functions.BiSink;
+
+/**
+ * MapStreamAccessor
+ *
+ * @author Brian Goetz
+ */
+public interface MapStreamAccessor<K, V> extends StreamAccessor<Mapping<K,V>>, MapIterator<K,V> {
+    void forEach(BiSink<? super K, ? super V> sink);
+
+    Stream.Shape getShape() default { return Stream.Shape.KEY_VALUE; }
+}
--- a/src/share/classes/java/util/streams/MapStreamOps.java	Mon Aug 20 16:10:37 2012 -0400
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,188 +0,0 @@
-/*
- * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.  Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-package java.util.streams;
-
-import java.util.Comparator;
-import java.util.Map;
-import java.util.Mapping;
-import java.util.Optional;
-import java.util.functions.*;
-
-/**
- * Operations upon MapStreams.
- *
- * @param <K> Type of mapping key.
- * @param <V> Type of mapping value.
- *
- * @author Brian Goetz
- */
-interface MapStreamOps<K, V> {
-
-    /**
-     * Return the mapping which is semantically "first" in the stream. If the
-     * stream is not ordered then repeated invocations may return a different
-     * mapping.
-     *
-     * @return the first mapping of the stream if any.
-     */
-    Optional<Mapping<K, V>> findFirst();
-
-    /**
-     * Return any mapping in the stream. Repeated invocations may return the
-     * same mapping or a different mapping.
-     *
-     * @return a mapping in the stream.
-     */
-    Optional<Mapping<K, V>> findAny();
-
-    /**
-     * Return a stream of the keys.
-     *
-     * @return a stream of the keys.
-     */
-    Stream<K> keys();
-
-    /**
-     * Return a stream of the values.
-     *
-     * @return a stream of the values.
-     */
-    Stream<V> values();
-
-    /**
-     * Return a stream filtered by the provided predicate.
-     *
-     * @param predicate the predicate used to filter elements.
-     * @return The filtered stream.
-     */
-    MapStream<K, V> filter(BiPredicate<? super K, ? super V> predicate);
-
-    /**
-     * Return a stream filtered by the provided predicate.
-     *
-     * @param predicate the predicate used to filter elements based upon keys.
-     * @return The filtered stream.
-     */
-    MapStream<K, V> filterKeys(Predicate<? super K> predicate);
-
-    /**
-     * Return a stream filtered by the provided predicate.
-     *
-     * @param predicate the predicate used to filter elements based upon values.
-     * @return The filtered stream.
-     */
-    MapStream<K, V> filterValues(Predicate<? super V> predicate);
-
-    /**
-     * Return a MapStream where the values have been mapped via the provided
-     * Mapper.
-     *
-     * @param <U> Type of mapping values in the returned stream.
-     * @param mapper the predicate used to map elements.
-     * @return The mapped stream.
-     */
-    <U> MapStream<K, U> mapValues(BiMapper<? super K, ? super V, ? extends U> mapper);
-
-    /**
-     * Return a MapStream where the values have been mapped via the provided
-     * Mapper.
-     *
-     * @param <U> Type of mapping values in the returned stream.
-     * @param mapper the predicate used to map elements.
-     * @return The mapped stream.
-     */
-    <U> MapStream<K, U> mapValues(Mapper<? super V, ? extends U> mapper);
-
-    /**
-     * Each element of this stream is processed by the provided sink.
-     *
-     * @param sink the Sink via which all elements will be processed.
-     */
-    void forEach(BiSink<K, V> sink);
-
-    // @@@ Map, or MapFillable?
-    <A extends Map<K, V>> A into(A target);
-
-    /**
-     * Any of the elements of this map stream return {@code true} for the
-     * provided predicate.
-     *
-     * @param predicate The predicate against which elements will be evaluated.
-     * @return {@code true} if any of the elements return {@code true} for the
-     * provided predicate.
-     */
-    boolean anyMatch(BiPredicate<? super K, ? super V> predicate);
-
-    /**
-     * all of the elements of this map stream return {@code true} for the
-     * provided predicate.
-     *
-     * @param predicate The predicate against which elements will be evaluated.
-     * @return {@code true} if all of the elements return {@code true} for the
-     * provided predicate.
-     */
-    boolean allMatch(BiPredicate<? super K, ? super V> predicate);
-
-    /**
-     * None of the elements of this map stream return {@code true} for the
-     * provided predicate.
-     *
-     * @param predicate The predicate against which elements will be evaluated.
-     * @return {@code true} if none of the elements return {@code true} for the
-     * provided predicate.
-     */
-    boolean noneMatch(BiPredicate<? super K, ? super V> predicate);
-
-    /**
-     * Returns a stream composed of the elements o
-     *
-     * ?!? mduigou Intended semantic for handling duplicates? Is concatenation a
-     * valid implementation?
-     *
-     * @param other
-     * @return
-     */
-    MapStream<K, V> mergeWith(MapStream<K, ? extends V> other);
-
-    /**
-     * Returns a map stream containing the same elements sorted in the order
-     * specified by the provided Comparator.
-     *
-     * @param comparator used to evaluate elements to determine their sorted
-     * order.
-     * @return a map stream containing the same elements sorted in the order
-     * specified by the provided Comparator.
-     */
-    MapStream<K, V> sorted(Comparator<? super K> comparator);
-
-    /**
-     * Return a map stream which contains the same elements as this stream but
-     * the keys and values have been swapped.
-     *
-     * @return A map stream which contains the same elements as this stream but
-     * the keys and values have been swapped.
-     */
-    MapStream<V, K> swap();
-}
--- a/src/share/classes/java/util/streams/MapStreamable.java	Mon Aug 20 16:10:37 2012 -0400
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,36 +0,0 @@
-/*
- * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.  Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-package java.util.streams;
-
-/**
- * MapStreamable
- *
- * @param <K> Type of mapping key.
- * @param <V> Type of mapping value.
- *
- * @author Brian Goetz
- */
-public interface MapStreamable<K, V> extends MapStreamOps<K,V> {
-}
--- a/src/share/classes/java/util/streams/ParallelPipeline.java	Mon Aug 20 16:10:37 2012 -0400
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,360 +0,0 @@
-/*
- * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.  Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-package java.util.streams;
-
-import java.util.*;
-import java.util.concurrent.ForkJoinTask;
-import java.util.concurrent.ForkJoinUtils;
-import java.util.concurrent.RecursiveAction;
-import java.util.concurrent.RecursiveTask;
-import java.util.functions.*;
-import java.util.streams.ops.*;
-
-/**
- * ParallelPipeline
- *
- * @author Brian Goetz
- */
-public class ParallelPipeline<T, U> implements ParallelStream<U> {
-    private final ParallelStreamable<T> base;
-    private final ParallelPipeline<?, T> upstream;
-    private final StatelessOp<T, U> stage;
-
-    public ParallelPipeline(ParallelStreamable<T> base, StatelessOp<T, U> onlyStage) {
-        this.base = base;
-        this.upstream = null;
-        this.stage = onlyStage;
-        assert !stage.isStateful();
-    }
-
-    ParallelPipeline(ParallelPipeline<?, T> upstream, StatelessOp<T, U> newStage) {
-        this.base = null;
-        this.upstream = upstream;
-        this.stage = newStage;
-        assert !stage.isStateful();
-    }
-
-    public static<T> ParallelPipeline<T,T> wrap(ParallelStreamable<T> source) {
-        return new ParallelPipeline<>(source, new IdOp<T>());
-    }
-
-    private <V> ParallelPipeline<T, V> replaceWith(StatelessOp<T, V> newStage) {
-        return base != null
-               ? new ParallelPipeline<>(base, newStage)
-               : new ParallelPipeline<>(upstream, newStage);
-    }
-
-    private <V> ParallelPipeline<U, V> chainWith(StatelessOp<U, V> newStage) {
-        return new ParallelPipeline<>(this, newStage);
-    }
-
-    private Iterator<U> iterator() {
-        return stage.iterator(base == null ? upstream.iterator() : base.iterator());
-    }
-
-    private int getStreamState() {
-        // @@@ Need to add stream-state support to ParallelStreamable
-        return stage.getStreamState(upstream != null ? upstream.getStreamState() : Stream.STATE_SIZED);
-    }
-
-    @Override
-    public ParallelStream<U> filter(Predicate<? super U> predicate) {
-        return chainWith(new FilterOp<>(predicate));
-    }
-
-    @Override
-    public <R> ParallelStream<R> map(Mapper<? super U, ? extends R> mapper) {
-        return chainWith(new MapOp<>(mapper));
-    }
-
-    @Override
-    public <R> ParallelStream<R> flatMap(FlatMapper<? super U, R> mapper) {
-        return chainWith(new FlatMapOp<>(mapper));
-    }
-
-    public<V> V pipeline(EagerOp<U, V> op) {
-        return op.computeParallel(base, makeHelper());
-    }
-
-    public<V> V pipeline(ShortCircuitEagerOp<U, V> op) {
-        return op.computeParallel(base, makeHelper());
-    }
-
-    public<V> ParallelStreamable<V> pipeline(StatefulOp<U, V, ?> op) {
-        return op.computeParallel(base, makeHelper());
-    }
-
-    @Override
-    public ParallelStreamable<U> uniqueElements() {
-        return pipeline(new UniqOp<U>());
-    }
-
-    @Override
-    public ParallelStreamable<U> sorted(Comparator<? super U> comparator) {
-        return pipeline(new SortedOp<>(comparator));
-    }
-
-    @Override
-    public ParallelStreamable<U> cumulate(BinaryOperator<U> operator) {
-        return pipeline(new CumulateOp<>(operator));
-    }
-
-    @Override
-    public void forEach(final Sink<? super U> sink) {
-        int depth = ForkJoinUtils.suggestDepth(base.size());
-        Sink<T> compoundSink = makeSink(sink);
-        Spliterator<T> spliterator = base.spliterator();
-        if (depth == 0) {
-            spliterator.forEach(compoundSink);
-        } else {
-            ForkJoinUtils.defaultFJPool().invoke(new ForEachTask<>(depth, spliterator, compoundSink));
-        }
-    }
-
-    @Override
-    public <A extends Fillable<? super U>> A into(A target) {
-        forEach((U u) -> { target.add(u); });
-        return target;
-    }
-
-    @Override
-    public Object[] toArray() {
-        return pipeline(new ToArrayOp<U>());
-    }
-
-    @Override
-    public <K> Map<K, SizedStreamable<U>> groupBy(Mapper<? super U, ? extends K> mapper) {
-        return pipeline(new GroupByOp<>(mapper));
-    }
-
-    @Override
-    public SizedStreamable<U> sequential() {
-        // @@@ TODO: build more generalized machinery for building trees of splits, pre-sizing arrays, pre-packing, etc
-        ParallelOp.ParallelOpHelper<U, T> helper = makeHelper();
-        int depth = helper.suggestDepth(base);
-        Spliterator<T> spliterator = base.spliterator();
-        if (depth == 0) {
-            int size = spliterator.getRemainingSizeIfKnown();
-            StreamBuilder<U> builder = (size != -1 && (getStreamState() & Stream.STATE_SIZED) != 0)
-                                       ? StreamBuilders.<U>makeFixed(size)
-                                       : StreamBuilders.<U>make();
-            spliterator.forEach(helper.sink(builder));
-            return builder;
-        }
-        else {
-            if (spliterator.isPredictableSplits() && (getStreamState() & Stream.STATE_SIZED) != 0) {
-                int length = spliterator.getRemainingSizeIfKnown();
-                U[] array = (U[]) new Object[length];
-                helper.invoke(new SizedSequentialTask<>(depth, spliterator, helper, array, 0, length));
-                return Arrays.asList(array);
-            }
-            else {
-                return helper.invoke(new SequentialTask<>(depth, spliterator, helper));
-            }
-        }
-    }
-
-    @Override
-    public U reduce(final U seed, final BinaryOperator<U> op) {
-        // @@@ Temporary hack to work around compiler error
-        Factory<U> seedFactory = new Factory<U>() {
-            @Override
-            public U make() {
-                return seed;
-            }
-        };
-        return pipeline(new FoldOp<>(seedFactory, op, op));
-    }
-
-    @Override
-    public Optional<U> reduce(BinaryOperator<U> op) {
-        return pipeline(new SeedlessFoldOp<>(op));
-    }
-
-    @Override
-    public <V> V fold(Factory<V> seedFactory, Combiner<V, U, V> reducer, BinaryOperator<V> combiner) {
-        return pipeline(new FoldOp<>(seedFactory, reducer, combiner));
-    }
-
-    @Override
-    public boolean anyMatch(Predicate<? super U> predicate) {
-        return pipeline(new AnyMatchOp<>(predicate));
-    }
-
-    @Override
-    public boolean allMatch(Predicate<? super U> predicate) {
-        return pipeline(new AllMatchOp<>(predicate));
-    }
-
-    @Override
-    public boolean noneMatch(Predicate<? super U> predicate) {
-        return pipeline(new NoneMatchOp<>(predicate));
-    }
-
-    @Override
-    public Optional<U> findFirst() {
-        return pipeline(new FindFirstOp<U>());
-    }
-
-    @Override
-    public Optional<U> findAny() {
-        return pipeline(new FindAnyOp<U>());
-    }
-
-    private<V> Sink<T> makeSink(Sink<V> sink) {
-        ParallelPipeline<?,?> pipe = this;
-        Sink<?> chain = sink;
-
-        do {
-            StatelessOp<?,?> thisStage = pipe.stage;
-            chain = thisStage.sink( (Sink) chain);
-            pipe = pipe.upstream;
-        } while (pipe != null);
-
-        return (Sink<T>) chain;
-    }
-
-    private<V> Mapper<Sink<V>, Sink<T>> makeSinkFactory() {
-        return new Mapper<Sink<V>, Sink<T>>() {
-            @Override
-            public Sink<T> map(final Sink<V> sink) {
-                return makeSink(sink);
-            }
-        };
-    }
-
-    private ParallelOp.ParallelOpHelper<U, T> makeHelper() {
-        return new ParallelOp.ParallelOpHelper<U, T>() {
-            @Override
-            public int suggestDepth(ParallelStreamable<T> source) {
-                return ForkJoinUtils.suggestDepth(source.size());
-            }
-
-            @Override
-            public Sink<T> sink(Sink<U> sink) {
-                return makeSink(sink);
-            }
-
-            @Override
-            public Iterator<U> iterator() {
-                return ParallelPipeline.this.iterator();
-            }
-
-            @Override
-            public <Z> Z invoke(ForkJoinTask<Z> task) {
-                return ForkJoinUtils.defaultFJPool().invoke(task);
-            }
-        };
-    }
-
-    private static class ForEachTask<T> extends RecursiveAction {
-        private final int depth;
-        private final Spliterator<T> spliterator;
-        private final Sink<T> sink;
-
-        private ForEachTask(int depth, Spliterator<T> spliterator, Sink<T> sink) {
-            this.depth = depth;
-            this.spliterator = spliterator;
-            this.sink = sink;
-        }
-
-        @Override
-        protected void compute() {
-            if (depth == 0) {
-                spliterator.forEach(sink);
-            } else {
-                ForEachTask<T> left = new ForEachTask<>(depth - 1, spliterator.split(), sink);
-                ForEachTask<T> right = new ForEachTask<>(depth - 1, spliterator, sink);
-                right.fork();
-                left.compute();
-                right.join();
-            }
-        }
-    }
-
-    private static class SizedSequentialTask<T, U> extends RecursiveAction {
-        private final int depth;
-        private final Spliterator<T> spliterator;
-        private ParallelOp.ParallelOpHelper<U, T> helper;
-        private final U[] array;
-        private int offset;
-        private int length;
-
-        private SizedSequentialTask(int depth, Spliterator<T> spliterator, ParallelOp.ParallelOpHelper<U, T> helper, U[] array, int offset, int length) {
-            this.depth = depth;
-            this.spliterator = spliterator;
-            this.helper = helper;
-            this.array = array;
-            this.offset = offset;
-            this.length = length;
-        }
-
-        @Override
-        protected void compute() {
-            if (depth == 0) {
-                spliterator.forEach(helper.sink(Arrays.sink(array, offset, length)));
-            }
-            else {
-                Spliterator<T> split = spliterator.split();
-                int leftSize = split.getRemainingSizeIfKnown();
-                assert(leftSize != -1);
-                SizedSequentialTask<T, U> left = new SizedSequentialTask<>(depth-1, split, helper, array, offset, leftSize);
-                SizedSequentialTask<T, U> right = new SizedSequentialTask<>(depth-1, spliterator, helper, array, offset+leftSize, length-leftSize);
-                right.fork();
-                left.compute();
-                right.join();
-            }
-        }
-    }
-
-    private static class SequentialTask<T, U> extends RecursiveTask<SizedStreamable<U>> {
-        private final int depth;
-        private final Spliterator<T> spliterator;
-        private ParallelOp.ParallelOpHelper<U, T> helper;
-
-        public SequentialTask(int depth, Spliterator<T> spliterator, ParallelOp.ParallelOpHelper<U, T> helper) {
-            this.depth = depth;
-            this.spliterator = spliterator;
-            this.helper = helper;
-        }
-
-        @Override
-        protected SizedStreamable<U> compute() {
-            if (depth == 0) {
-                StreamBuilder<U> builder = StreamBuilders.make();
-                spliterator.forEach(helper.sink(builder));
-                return builder;
-            }
-            else {
-                SequentialTask<T, U> left = new SequentialTask<>(depth-1, spliterator.split(), helper);
-                SequentialTask<T, U> right = new SequentialTask<>(depth-1, spliterator, helper);
-                right.fork();
-                SizedStreamable<U> leftResult = left.compute();
-                SizedStreamable<U> rightResult = right.join();
-                return Streams.concatenate(leftResult, rightResult);
-            }
-        }
-    }
-}
--- a/src/share/classes/java/util/streams/ParallelStream.java	Mon Aug 20 16:10:37 2012 -0400
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,34 +0,0 @@
-/*
- * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.  Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-package java.util.streams;
-
-/**
- * ParallelStream
- *
- * @param <T> Type of elements.
- *
- * @author Brian Goetz
- */
-public interface ParallelStream<T> extends ParallelStreamOps<T> { }
--- a/src/share/classes/java/util/streams/ParallelStreamOps.java	Mon Aug 20 16:10:37 2012 -0400
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,90 +0,0 @@
-/*
- * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.  Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-package java.util.streams;
-
-import java.util.Comparator;
-import java.util.Fillable;
-import java.util.Map;
-import java.util.Optional;
-import java.util.functions.*;
-
-/**
- * ParallelStreamOps
- *
- * @param <T> Type of elements.
- *
- * @author Brian Goetz
- */
-interface ParallelStreamOps<T> {
-    ParallelStream<T> filter(Predicate<? super T> predicate);
-
-    <R> ParallelStream<R> map(Mapper<? super T, ? extends R> mapper);
-
-    <R> ParallelStream<R> flatMap(FlatMapper<? super T, R> mapper);
-
-    void forEach(Sink<? super T> sink);
-
-    <A extends Fillable<? super T>> A into(A target);
-
-    Object[] toArray();
-
-    <K> Map<K, SizedStreamable<T>> groupBy(Mapper<? super T, ? extends K> mapper);
-
-    /**
-     * Produce an {@link Iterable} representing the contents of this {@code Splittable}.  In general, this method is
-     * only called at the top of a decomposition tree, indicating that operations that produced the {@code Spliterable}
-     * can happen in parallel, but the results are assembled for sequential traversal.  This is designed to support
-     * patterns like:
-     *     collection.filter(t -> t.matches(k))
-     *               .map(t -> t.getLabel())
-     *               .sequential()
-     *               .forEach(e -> println(e));
-     * where the filter / map operations can occur in parallel, and then the results can be traversed
-     * sequentially in a predicatable order.
-     */
-    SizedStreamable<T> sequential();
-
-    ParallelStreamable<T> uniqueElements();
-
-    ParallelStreamable<T> sorted(Comparator<? super T> comparator);
-
-    ParallelStreamable<T> cumulate(BinaryOperator<T> operator);
-
-    T reduce(T base, BinaryOperator<T> op);
-
-    Optional<T> reduce(BinaryOperator<T> op);
-
-    <U> U fold(Factory<U> baseFactory, Combiner<U, T, U> reducer, BinaryOperator<U> combiner);
-
-    boolean anyMatch(Predicate<? super T> predicate);
-
-    boolean allMatch(Predicate<? super T> predicate);
-
-    boolean noneMatch(Predicate<? super T> predicate);
-
-    Optional<T> findFirst();
-
-    Optional<T> findAny();
-}
--- a/src/share/classes/java/util/streams/ParallelStreamable.java	Mon Aug 20 16:10:37 2012 -0400
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,135 +0,0 @@
-/*
- * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.  Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-package java.util.streams;
-
-import java.util.*;
-import java.util.functions.*;
-import java.util.streams.ops.FilterOp;
-import java.util.streams.ops.FlatMapOp;
-import java.util.streams.ops.MapOp;
-
-/**
- * ParallelStreamable
- *
- * @param <T> Type of elements.
- *
- * @author Brian Goetz
- */
-public interface ParallelStreamable<T> extends ParallelStreamOps<T>, Splittable<T>, Sized {
-    @Override
-    ParallelStream<T> filter(Predicate<? super T> predicate) default {
-        return new ParallelPipeline<>(this, new FilterOp<>(predicate));
-    }
-
-    @Override
-    <R> ParallelStream<R> map(Mapper<? super T, ? extends R> mapper) default {
-        return new ParallelPipeline<>(this, new MapOp<>(mapper));
-    }
-
-    @Override
-    <R> ParallelStream<R> flatMap(FlatMapper<? super T, R> mapper) default {
-        return new ParallelPipeline<>(this, new FlatMapOp<>(mapper));
-    }
-
-    @Override
-    ParallelStreamable<T> uniqueElements() default {
-        return ParallelPipeline.wrap(this).uniqueElements();
-    }
-
-    @Override
-    ParallelStreamable<T> sorted(Comparator<? super T> comparator) default {
-        return ParallelPipeline.wrap(this).sorted(comparator);
-    }
-
-    @Override
-    ParallelStreamable<T> cumulate(BinaryOperator<T> operator) default {
-        return ParallelPipeline.wrap(this).cumulate(operator);
-    }
-
-    @Override
-    public void forEach(Sink<? super T> sink) default {
-        ParallelPipeline.wrap(this).forEach(sink);
-    }
-
-    @Override
-    <A extends Fillable<? super T>> A into(A target) default {
-        return ParallelPipeline.wrap(this).into(target);
-    }
-
-    @Override
-    Object[] toArray() default {
-        return ParallelPipeline.wrap(this).toArray();
-    }
-
-    @Override
-    <K> Map<K, SizedStreamable<T>> groupBy(Mapper<? super T, ? extends K> mapper) default {
-        return ParallelPipeline.wrap(this).groupBy(mapper);
-    }
-
-    @Override
-    public T reduce(T base, BinaryOperator<T> op) default {
-        return ParallelPipeline.wrap(this).reduce(base, op);
-    }
-
-    @Override
-    Optional<T> reduce(BinaryOperator<T> op) default {
-        return ParallelPipeline.wrap(this).reduce(op);
-    }
-
-    @Override
-    public <U> U fold(Factory<U> seedFactory, Combiner<U, T, U> reducer, BinaryOperator<U> combiner) default {
-        return ParallelPipeline.wrap(this).fold(seedFactory, reducer, combiner);
-    }
-
-    @Override
-    public SizedStreamable<T> sequential() default {
-        return ParallelPipeline.wrap(this).sequential();
-    }
-
-    @Override
-    boolean anyMatch(Predicate<? super T> predicate) default {
-        return ParallelPipeline.wrap(this).anyMatch(predicate);
-    }
-
-    @Override
-    boolean allMatch(Predicate<? super T> predicate) default {
-        return ParallelPipeline.wrap(this).allMatch(predicate);
-    }
-
-    @Override
-    boolean noneMatch(Predicate<? super T> predicate) default {
-        return ParallelPipeline.wrap(this).noneMatch(predicate);
-    }
-
-    @Override
-    Optional<T> findAny() default {
-        return ParallelPipeline.wrap(this).findAny();
-    }
-
-    @Override
-    Optional<T> findFirst() default {
-        return ParallelPipeline.wrap(this).findFirst();
-    }
-}
--- a/src/share/classes/java/util/streams/SequentialMapPipeline.java	Mon Aug 20 16:10:37 2012 -0400
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,231 +0,0 @@
-/*
- * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.  Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-package java.util.streams;
-
-import java.util.*;
-import java.util.functions.*;
-import java.util.streams.ops.*;
-
-/**
- * SequentialMapPipeline
- *
- * @param <U> Type of elements in the upstream.
- * @param <K> Type of mapping key.
- * @param <V> Type of mapping value.
- *
- * @author Brian Goetz
- */
-public class SequentialMapPipeline<U, K, V> implements MapStream<K, V>, Traversable<Mapping<K,V>> {
-    /**
-     * Source of elements
-     */
-    private final Traversable<U> upstream;
-
-    /**
-     * Operation for this stage.
-     */
-    private final ElementwiseOp<U, Mapping<K, V>> stage;
-
-    /**
-     * If initialized then we are committed to pull iteration via iterator.
-     */
-    private MapIterator<K,V> iterator;
-
-    public SequentialMapPipeline(Traversable<U> upstream, ElementwiseOp<U, Mapping<K, V>> stage) {
-        this.upstream = Objects.requireNonNull(upstream);
-        this.stage = Objects.requireNonNull(stage);
-    }
-
-    private<KK,VV> SequentialMapPipeline<U,KK,VV> replaceWith(ElementwiseOp<U, Mapping<KK,VV>> newStage) {
-        return new SequentialMapPipeline<>(upstream, newStage);
-    }
-
-    private<KK,VV> SequentialMapPipeline<Mapping<K,V>,KK,VV> chainWith(ElementwiseOp<Mapping<K,V>, Mapping<KK,VV>> newStage) {
-        return new SequentialMapPipeline<>(this, newStage);
-    }
-
-    @Override
-    public MapIterator<K,V> iterator() {
-       if (iterator == null) {
-            iterator = new MapIterator.IteratorAdapter<>(stage.iterator(upstream.iterator()));
-       }
-        return iterator;
-    }
-
-    @Override
-    public boolean hasNext() {
-        return iterator().hasNext();
-    }
-
-    @Override
-    public Mapping<K, V> next() {
-        return iterator().next();
-    }
-
-    @Override
-    public K nextKey() {
-        return iterator().nextKey();
-    }
-
-    @Override
-    public V nextValue() {
-        return iterator().nextValue();
-    }
-
-    @Override
-    public K curKey() {
-        return iterator().curKey();
-    }
-
-    @Override
-    public V curValue() {
-        return iterator().curValue();
-    }
-
-    private void forEachInto(Sink<U> wrappedSink) {
-        if (stage.isStateful()) {
-            StatefulSink<U, Void> statefulSink = (StatefulSink<U, Void>) wrappedSink;
-            statefulSink.begin(-1);
-            upstream.forEach(wrappedSink);
-            statefulSink.end();
-        } else {
-            upstream.forEach(wrappedSink);
-        }
-    }
-
-    @Override
-    public void forEach(BiSink<K, V> sink) {
-        if (iterator == null) {
-            forEachInto(stage.sink(sink));
-        } else {
-            for (MapIterator<K,V> each = iterator(); each.hasNext(); ) {
-                K key = each.nextKey();
-                V value = each.curValue();
-                sink.accept(key, value);
-            }
-        }
-    }
-
-    @Override
-    public void forEach(Sink<? super Mapping<K, V>> sink) {
-        if (sink instanceof BiSink) {
-            forEach((BiSink<K,V>) (Sink) sink);
-            return;
-        }
-        if (iterator == null) {
-            forEachInto(stage.sink(sink));
-        } else {
-            for (Iterator<Mapping<K,V>> each = iterator(); each.hasNext(); ) {
-                sink.accept(each.next());
-            }
-        }
-    }
-
-   public<U> U pipeline(ShortCircuitEagerOp<Mapping<K,V>, U> op) {
-        return op.evaluate(iterator());
-    }
-
-    @Override
-    public <A extends Map<K, V>> A into(A target) {
-        forEach(target::put);
-        return target;
-    }
-
-    @Override
-    public Stream<K> keys() {
-        return new SequentialPipeline<>(this, new MapExtractKeysOp<K,V>());
-    }
-
-    @Override
-    public Stream<V> values() {
-        return new SequentialPipeline<>(this, new MapExtractValuesOp<K,V>());
-    }
-
-    @Override
-    public MapStream<K, V> filter(BiPredicate<? super K, ? super V> predicate) {
-        return chainWith(new BiFilterOp<>(predicate));
-    }
-
-    @Override
-    public MapStream<K, V> filterKeys(Predicate<? super K> predicate) {
-        return chainWith(new MapFilterKeysOp<K,V>(predicate));
-    }
-
-    @Override
-    public MapStream<K, V> filterValues(Predicate<? super V> predicate) {
-        return chainWith(new MapFilterValuesOp<K,V>(predicate));
-    }
-
-    @Override
-    public <U> MapStream<K, U> mapValues(BiMapper<? super K, ? super V, ? extends U> mapper) {
-        return chainWith(new BiMapOp<>(mapper));
-    }
-
-    @Override
-    public <U> MapStream<K, U> mapValues(Mapper<? super V, ? extends U> mapper) {
-        return chainWith(new MapMapValuesOp<K,V,U>(mapper));
-    }
-
-    @Override
-    public MapStream<V, K> swap() {
-        return chainWith(MapSwapOp.<K,V>singleton());
-    }
-
-    @Override
-    public MapStream<K, V> sorted(Comparator<? super K> comparator) {
-        return chainWith(new MapSortedOp(comparator));
-    }
-
-    @Override
-    public MapStream<K, V> mergeWith(MapStream<K, ? extends V> other) {
-        // !?! mduigou I don't know what semantic was intended.
-        throw new UnsupportedOperationException("nyi");
-    }
-
-    @Override
-    public boolean noneMatch(BiPredicate<? super K, ? super V> predicate) {
-        return allMatch(predicate.negate());
-    }
-
-    @Override
-    public boolean allMatch(BiPredicate<? super K, ? super V> predicate) {
-        return pipeline(new BiAllMatchOp<K,V>(predicate));
-    }
-
-    @Override
-    public boolean anyMatch(BiPredicate<? super K, ? super V> predicate) {
-        return pipeline(new BiAnyMatchOp<K,V>(predicate));
-    }
-
-    @Override
-    public Optional<Mapping<K, V>> findAny() {
-        return pipeline(new FindAnyOp<Mapping<K,V>>());
-    }
-
-    @Override
-    public Optional<Mapping<K, V>> findFirst() {
-        return pipeline(new FindFirstOp<Mapping<K,V>>());
-    }
-}
--- a/src/share/classes/java/util/streams/SequentialPipeline.java	Mon Aug 20 16:10:37 2012 -0400
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,239 +0,0 @@
-/*
- * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.  Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-package java.util.streams;
-
-import java.util.*;
-import java.util.functions.*;
-import java.util.streams.ops.*;
-
-/**
- * SequentialPipeline
- *
- * @param <T> Type of elements in the upstream source.
- * @param <U> Type of elements in produced by this stage.
- *
- * @author Brian Goetz
- */
-public class SequentialPipeline<T, U> implements Stream<U>, Traversable<U> {
-    /**
-     * Source for elements.
-     */
-    private final Traversable<T> upstream;
-
-    /**
-     * Operation for this stage.
-     */
-    private final ElementwiseOp<T, U> stage;
-
-    /**
-     * If initialized then we are committed to pull iteration via iterator.
-     */
-    private Iterator<U> iterator;
-
-    public SequentialPipeline(Traversable<T> upstream, ElementwiseOp<T,U> stage) {
-        this.upstream = Objects.requireNonNull(upstream);
-        this.stage = Objects.requireNonNull(stage);
-    }
-
-    private<V> SequentialPipeline<T,V> replaceWith(ElementwiseOp<T,V> newStage) {
-        return new SequentialPipeline<>(upstream, newStage);
-    }
-
-    private<V> SequentialPipeline<U,V> chainWith(ElementwiseOp<U,V> newStage) {
-        return new SequentialPipeline<>(this, newStage);
-    }
-
-    public static<T> SequentialPipeline<T,T> wrap(Streamable<T> source) {
-        return new SequentialPipeline<>(source, new IdOp<T>());
-    }
-
-    @Override
-    public Iterator<U> iterator() {
-        if (iterator == null) {
-            iterator = stage.iterator(upstream.iterator());
-        }
-        return iterator;
-    }
-
-    // Calling the Iterator methods forces us into lazy mode
-    @Override
-    public boolean hasNext() {
-        return iterator().hasNext();
-    }
-
-    // Calling the Iterator methods forces us into lazy mode
-    @Override
-    public U next() {
-        return iterator().next();
-    }
-
-    @Override
-    public Stream<U> filter(Predicate<? super U> predicate) {
-        return chainWith(new FilterOp<>(predicate));
-    }
-
-    @Override
-    public <R> Stream<R> map(Mapper<? super U, ? extends R> mapper) {
-        return chainWith(new MapOp<>(mapper));
-    }
-
-    @Override
-    public <R> Stream<R> flatMap(FlatMapper<? super U, R> mapper) {
-        return chainWith(new FlatMapOp<>(mapper));
-    }
-
-    @Override
-    public void forEach(Sink<? super U> sink) {
-        if (iterator == null) {
-            Sink<T> wrappedSink = stage.sink(sink);
-            if (stage.isStateful()) {
-                StatefulSink<U, Void> statefulSink = (StatefulSink<U, Void>) wrappedSink;
-                // @@@ supply size if known
-                statefulSink.begin(-1);
-                upstream.forEach(wrappedSink);
-                statefulSink.end();
-            } else {
-                upstream.forEach(wrappedSink);
-            }
-        } else {
-            for (Iterator<U> each = iterator(); each.hasNext(); ) {
-                sink.accept(each.next());
-            }
-        }
-    }
-
-    @Override
-    public <A extends Fillable<? super U>> A into(A target) {
-        forEach(target::add);
-        return target;
-    }
-
-    @Override
-    public <K> Map<K, SizedStreamable<U>> groupBy(Mapper<? super U, ? extends K> mapper) {
-        return pipeline(new GroupByOp<>(mapper));
-    }
-
-    @Override
-    public Object[] toArray() {
-        return pipeline(ToArrayOp.<U>singleton());
-    }
-
-    public<V> Stream<V> pipeline(StatelessOp<U, V> op) {
-        return chainWith(op);
-    }
-
-    public<V> Stream<V> pipeline(StatefulOp<U, V, ?> op) {
-        return chainWith(op);
-    }
-
-    public<V> V pipeline(EagerOp<U, V> op) {
-        StatefulSink<U, V> sink = op.sink();
-        sink.begin(-1); // @@@ supply size if known
-        forEach(sink);
-        return sink.end();
-    }
-
-    public<V> V pipeline(ShortCircuitEagerOp<U, V> op) {
-        return op.evaluate(iterator());
-    }
-
-    public static<T,U> U eagerOp(Traversable<T> source, EagerOp<T,U> op) {
-        StatefulSink<T,U> sink = op.sink();
-        sink.begin(-1);  // @@@ supply size if known
-        source.forEach(sink);
-        return sink.end();
-    }
-
-    public static<T,U> U eagerOp(Iterable<T> source, ShortCircuitEagerOp<T,U> op) {
-        return op.evaluate(source.iterator());
-    }
-
-    @Override
-    public boolean anyMatch(Predicate<? super U> predicate) {
-        return pipeline(new AnyMatchOp<>(predicate));
-    }
-
-    @Override
-    public boolean allMatch(Predicate<? super U> predicate) {
-        return pipeline(new AllMatchOp<>(predicate));
-    }
-
-    @Override
-    public boolean noneMatch(Predicate<? super U> predicate) {
-        return pipeline(new NoneMatchOp<>(predicate));
-    }
-
-    @Override
-    public Optional<U> findFirst() {
-        return pipeline(new FindFirstOp<U>());
-    }
-
-    @Override
-    public Optional<U> findAny() {
-        return pipeline(new FindAnyOp<U>());
-    }
-
-    @Override
-    public <R> MapStream<U, R> mapped(Mapper<? super U, ? extends R> mapper) {
-        return new SequentialMapPipeline<>(this, new MappedOp<>(mapper));
-    }
-
-    @Override
-    public Stream<U> uniqueElements() {
-        return pipeline(new UniqOp<U>());
-    }
-
-    @Override
-    public Stream<U> sorted(Comparator<? super U> comparator) {
-        return pipeline(new SortedOp<>(comparator));
-    }
-
-    @Override
-    public Stream<U> cumulate(BinaryOperator<U> operator) {
-        return pipeline(new CumulateOp<>(operator));
-    }
-
-    @Override
-    public U reduce(final U seed, final BinaryOperator<U> op) {
-        // @@@ Temporary hack to work around compiler error
-        Factory<U> seedFactory = new Factory<U>() {
-            @Override
-            public U make() {
-                return seed;
-            }
-        };
-        return pipeline(new FoldOp<>(seedFactory, op, op));
-    }
-
-    @Override
-    public Optional<U> reduce(BinaryOperator<U> op) {
-        return pipeline(new SeedlessFoldOp<U>(op));
-    }
-
-    @Override
-    public <V> V fold(Factory<V> seedFactory, Combiner<V, U, V> reducer, BinaryOperator<V> combiner) {
-        return pipeline(new FoldOp<>(seedFactory, reducer, combiner));
-    }
-}
--- a/src/share/classes/java/util/streams/SizedStreamable.java	Mon Aug 20 16:10:37 2012 -0400
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,37 +0,0 @@
-/*
- * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.  Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-package java.util.streams;
-
-import java.util.Sized;
-
-/**
- * A Streamable that is also Sized.
- *
- * @param <T> Type of stream elements.
- *
- * @author Brian Goetz
- */
-public interface SizedStreamable<T> extends Streamable<T>, Sized {
-}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/share/classes/java/util/streams/StatefulBiSink.java	Sat Aug 25 19:03:55 2012 -0400
@@ -0,0 +1,36 @@
+/*
+ * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package java.util.streams;
+
+import java.util.Mapping;
+import java.util.functions.BiSink;
+
+/**
+ * StatefulBiSink
+ *
+ * @author Brian Goetz
+ */
+public interface StatefulBiSink<K,V,R> extends BiSink<K,V>, StatefulSink<Mapping<K,V>,R> {
+}
--- a/src/share/classes/java/util/streams/Stream.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/src/share/classes/java/util/streams/Stream.java	Sat Aug 25 19:03:55 2012 -0400
@@ -24,7 +24,8 @@
  */
 package java.util.streams;
 
-import java.util.Iterator;
+import java.util.*;
+import java.util.functions.*;
 
 /**
  * A source of elements which may be iterated or mutated.
@@ -33,7 +34,9 @@
  *
  * @author Brian Goetz
  */
-public interface Stream<T> extends StreamOps<T>, Iterator<T> {
+public interface Stream<T> {
+    public enum Shape { LINEAR, KEY_VALUE }
+
     /**
      * Stream elements are unique. No two elements contained in the stream
      * are equivalent via {@code equals()}
@@ -60,4 +63,45 @@
      * Mask of undefined state bits.
      */
     public static final int STATE_UNKNOWN_MASK_V1 = ~(STATE_UNIQUE | STATE_SORTED | STATE_SIZED);
+
+    Iterator<T> iterator();
+
+
+    Stream<T> filter(Predicate<? super T> predicate);
+
+    <R> Stream<R> map(Mapper<? super T, ? extends R> mapper);
+
+    <R> Stream<R> flatMap(FlatMapper<? super T, R> mapper);
+
+    Stream<T> uniqueElements();
+
+    Stream<T> sorted(Comparator<? super T> comparator);
+
+    Stream<T> cumulate(BinaryOperator<T> operator);
+
+    void forEach(Sink<? super T> sink);
+
+    <A extends Fillable<? super T>> A into(A target);
+
+    Object[] toArray();
+
+    <U> Map<U, Streamable<T>> groupBy(Mapper<? super T, ? extends U> mapper);
+
+    T reduce(T base, BinaryOperator<T> op);Optional<T> reduce(BinaryOperator<T> op);
+
+    <U> U fold(Factory<U> seedFactory, Combiner<U, T, U> reducer, BinaryOperator<U> combiner);
+
+    boolean anyMatch(Predicate<? super T> predicate);
+
+    boolean allMatch(Predicate<? super T> predicate);
+
+    boolean noneMatch(Predicate<? super T> predicate);
+
+    Optional<T> findFirst();
+
+    Optional<T> findAny();
+
+    Stream<T> sequential();
+
+    <U> MapStream<T, U> mapped(Mapper<? super T, ? extends U> mapper);
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/share/classes/java/util/streams/StreamAccessor.java	Sat Aug 25 19:03:55 2012 -0400
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package java.util.streams;
+
+import java.util.Iterator;
+import java.util.Spliterator;
+import java.util.functions.Sink;
+
+/**
+ * StreamAccessor
+ *
+ * @author Brian Goetz
+ */
+public interface StreamAccessor<T> {
+    void forEach(Sink<? super T> sink);
+
+    Iterator<T> iterator();
+
+    public int getStreamFlags();
+
+    public int getSizeOrEstimate();
+
+    boolean isParallel();
+
+    boolean isPredictableSplits();
+
+    Stream.Shape getShape();
+
+    Spliterator<T> spliterator();
+}
--- a/src/share/classes/java/util/streams/StreamBuilder.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/src/share/classes/java/util/streams/StreamBuilder.java	Sat Aug 25 19:03:55 2012 -0400
@@ -32,15 +32,14 @@
  *
  * @author Brian Goetz
  */
-public interface StreamBuilder<T> extends SizedStreamable<T>, Sink<T> /* , Fillable<T> */ {
+public interface StreamBuilder<T> extends Sized, Streamable<T>, Traversable<T>, Sink<T> /* , Fillable<T> */ {
     @Override
-    int getStreamState();
+    int getStreamFlags();
 
     @Override
     void forEach(Sink<? super T> sink);
 
     void clear();
 
-    @Override
     Object[] toArray();
 }
--- a/src/share/classes/java/util/streams/StreamBuilders.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/src/share/classes/java/util/streams/StreamBuilders.java	Sat Aug 25 19:03:55 2012 -0400
@@ -28,7 +28,6 @@
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Objects;
 import java.util.functions.Sink;
 
 /**
@@ -52,30 +51,38 @@
         return new VariableStreamBuilder<>(initialSize);
     }
 
+    private static boolean equals(StreamBuilder a, StreamBuilder b) {
+        if (a.size() != b.size())
+            return false;
+        Iterator it1 = a.iterator();
+        Iterator it2 = b.iterator();
+        while (it1.hasNext()) {
+            Object x1 = it1.next();
+            Object x2 = it2.next();
+            if (!x1.equals(x2))
+                return false;
+        }
+        return true;
+    }
+
+    private static<T> int hashCode(StreamBuilder<T> sb) {
+        int h = 0;
+        for (T t : sb) {
+            h = 31 * h + t.hashCode();
+        }
+        return h;
+    }
+
     private static class FixedStreamBuilder<T> implements StreamBuilder<T> {
-        private final Object[] array;
+        private final T[] array;
         private int curSize;
 
         public FixedStreamBuilder(int size) {
-            array = new Object[size];
+            array = (T[]) new Object[size];
         }
 
         @Override
-        public boolean equals(Object obj) {
-            if(obj instanceof Streamable) {
-                return Streams.equals(this, (Streamable<?>) obj);
-            }
-
-            return false;
-        }
-
-        @Override
-        public int hashCode() {
-            return Streams.hashCode(this);
-        }
-
-        @Override
-        public int getStreamState() {
+        public int getStreamFlags() {
             return Stream.STATE_SIZED;
         }
 
@@ -91,13 +98,18 @@
         @Override
         public void forEach(Sink<? super T> sink) {
             for (int i=0; i<curSize; i++) {
-                sink.accept((T) array[i]);
+                sink.accept(array[i]);
             }
         }
 
         @Override
+        public Stream<T> stream() {
+            return Streams.stream(this, size());
+        }
+
+        @Override
         public Iterator<T> iterator() {
-            return (Iterator<T>) Arrays.iterator( array, 0, curSize);
+            return Arrays.iterator(array, 0, curSize);
         }
 
         @Override
@@ -116,9 +128,27 @@
                 return array;
             }
             else {
+                // @@@ Should this throw ISE instead?
                 return Arrays.copyOf(array, curSize);
             }
         }
+
+        @Override
+        public int hashCode() {
+            return StreamBuilders.hashCode(this);
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (!(obj instanceof StreamBuilder))
+                return false;
+            return StreamBuilders.equals(this, (StreamBuilder) obj);
+        }
+
+        @Override
+        public String toString() {
+            return String.format("FixedStreamBuilder[%d][%s]", array.length-curSize, Arrays.toString(array));
+        }
     }
 
     private static class VariableStreamBuilder<T> implements StreamBuilder<T> {
@@ -134,21 +164,7 @@
         }
 
         @Override
-        public boolean equals(Object obj) {
-            if(obj instanceof Streamable) {
-                return Streams.equals(this, (Streamable<?>) obj);
-            }
-
-            return false;
-        }
-
-        @Override
-        public int hashCode() {
-            return Streams.hashCode(this);
-        }
-
-        @Override
-        public int getStreamState() {
+        public int getStreamFlags() {
             return Stream.STATE_SIZED;
         }
 
@@ -163,6 +179,11 @@
         }
 
         @Override
+        public Stream<T> stream() {
+            return Streams.stream(this, size());
+        }
+
+        @Override
         public void forEach(Sink<? super T> sink) {
             list.forEach(sink);
         }
@@ -181,6 +202,23 @@
         public Object[] toArray() {
             return list.toArray();
         }
+
+        @Override
+        public int hashCode() {
+            return StreamBuilders.hashCode(this);
+        }
+
+        @Override
+        public String toString() {
+            return String.format("VariableStreamBuilder[%s]", list);
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (!(obj instanceof StreamBuilder))
+                return false;
+            return StreamBuilders.equals(this, (StreamBuilder) obj);
+        }
     }
 
 }
--- a/src/share/classes/java/util/streams/StreamOps.java	Mon Aug 20 16:10:37 2012 -0400
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,80 +0,0 @@
-/*
- * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.  Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-package java.util.streams;
-
-import java.util.Comparator;
-import java.util.Fillable;
-import java.util.Map;
-import java.util.Optional;
-import java.util.functions.*;
-
-/**
- * StreamOps
- *
- * @params <T> Type of elements.
- *
- * @author Brian Goetz
- */
-public interface StreamOps<T> {
-    Stream<T> filter(Predicate<? super T> predicate);
-
-    <R> Stream<R> map(Mapper<? super T, ? extends R> mapper);
-
-    <R> Stream<R> flatMap(FlatMapper<? super T, R> mapper);
-
-    Stream<T> uniqueElements();
-
-    Stream<T> sorted(Comparator<? super T> comparator);
-
-    Stream<T> cumulate(BinaryOperator<T> operator);
-
-    void forEach(Sink<? super T> sink);
-
-    <A extends Fillable<? super T>> A into(A target);
-
-    Object[] toArray();
-
-    <U> Map<U, SizedStreamable<T>> groupBy(Mapper<? super T, ? extends U> mapper);
-
-//    <U> Map<U, StreamSource<T>> groupByMulti(FlatMapper<? super T, ? extends U> mapper);
-
-    T reduce(T base, BinaryOperator<T> op);
-
-    Optional<T> reduce(BinaryOperator<T> op);
-
-    <U> U fold(Factory<U> seedFactory, Combiner<U, T, U> reducer, BinaryOperator<U> combiner);
-
-    boolean anyMatch(Predicate<? super T> predicate);
-
-    boolean allMatch(Predicate<? super T> predicate);
-
-    boolean noneMatch(Predicate<? super T> predicate);
-
-    Optional<T> findFirst();
-
-    Optional<T> findAny();
-
-    <U> MapStream<T, U> mapped(Mapper<? super T, ? extends U> mapper);
-}
--- a/src/share/classes/java/util/streams/Streamable.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/src/share/classes/java/util/streams/Streamable.java	Sat Aug 25 19:03:55 2012 -0400
@@ -35,119 +35,10 @@
  *
  * @author Brian Goetz
  */
-public interface Streamable<T> extends StreamOps<T>, Traversable<T> {
-    Stream<T> stream() default {
-        throw new UnsupportedOperationException("nyi");
-    }
+public interface Streamable<T> {
+    Stream<T> stream();
 
-    public int getStreamState() default {
+    public int getStreamFlags() default {
         return 0;
     }
-
-    @Override
-    void forEach(Sink<? super T> sink) default {
-        for (T t: this) {
-            sink.accept(t);
-        }
-    }
-
-    @Override
-    <A extends Fillable<? super T>> A into(A target) default {
-        for (T t : this) {
-            target.add(t);
-        }
-        return target;
-    }
-
-    @Override
-    Object[] toArray() default {
-        return SequentialPipeline.eagerOp(this, ToArrayOp.<T>singleton());
-    }
-
-    @Override
-    <K> Map<K, SizedStreamable<T>> groupBy(Mapper<? super T, ? extends K> mapper) default {
-        return SequentialPipeline.eagerOp(this, new GroupByOp<>(mapper));
-    }
-
-    @Override
-    Stream<T> filter(Predicate<? super T> predicate) default {
-        return new SequentialPipeline<>(this, new FilterOp<>(predicate));
-    }
-
-    @Override
-    <R> Stream<R> map(Mapper<? super T, ? extends R> mapper) default {
-        return new SequentialPipeline<>(this, new MapOp<>(mapper));
-    }
-
-    @Override
-    <R> Stream<R> flatMap(FlatMapper<? super T, R> mapper) default {
-        return new SequentialPipeline<>(this, new FlatMapOp<>(mapper));
-    }
-
-    @Override
-    Stream<T> uniqueElements() default {
-        return new SequentialPipeline<>(this, new UniqOp<T>());
-    }
-
-    @Override
-    Stream<T> sorted(Comparator<? super T> comparator) default {
-        return new SequentialPipeline<>(this, new SortedOp<>(comparator));
-    }
-
-    @Override
-    Stream<T> cumulate(BinaryOperator<T> operator) default {
-        return new SequentialPipeline<>(this, new CumulateOp<>(operator));
-    }
-
-    @Override
-    public T reduce(final T base, BinaryOperator<T> op) default {
-        // @@@ Temporary hack to work around compiler error
-        Factory<T> seedFactory = new Factory<T>() {
-            @Override
-            public T make() {
-                return base;
-            }
-        };
-        return fold(seedFactory, op, op);
-    }
-
-    @Override
-    public Optional<T> reduce(BinaryOperator<T> op) default {
-        return SequentialPipeline.eagerOp(this, new SeedlessFoldOp<>(op));
-    }
-
-    @Override
-    public <U> U fold(Factory<U> seedFactory, Combiner<U, T, U> reducer, BinaryOperator<U> combiner) default {
-        return SequentialPipeline.eagerOp(this, new FoldOp<>(seedFactory, reducer, combiner));
-    }
-
-    @Override
-    Optional<T> findFirst() default {
-        return SequentialPipeline.eagerOp(this, new FindFirstOp<T>());
-    }
-
-    @Override
-    <R> MapStream<T, R> mapped(Mapper<? super T, ? extends R> mapper) default {
-        return new SequentialMapPipeline<>(this, new MappedOp<>(mapper));
-    }
-
-    @Override
-    Optional<T> findAny() default {
-        return SequentialPipeline.eagerOp(this, new FindAnyOp<T>());
-    }
-
-    @Override
-    boolean anyMatch(Predicate<? super T> predicate) default {
-        return SequentialPipeline.eagerOp(this, new AnyMatchOp<>(predicate));
-    }
-
-    @Override
-    boolean allMatch(Predicate<? super T> predicate) default {
-        return SequentialPipeline.eagerOp(this, new AllMatchOp<>(predicate));
-    }
-
-    @Override
-    boolean noneMatch(Predicate<? super T> predicate) default {
-        return SequentialPipeline.eagerOp(this, new NoneMatchOp<>(predicate));
-    }
 }
--- a/src/share/classes/java/util/streams/Streams.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/src/share/classes/java/util/streams/Streams.java	Sat Aug 25 19:03:55 2012 -0400
@@ -37,63 +37,44 @@
         throw new Error("no instances");
     }
 
-    public static<T, U extends Iterator<T>> Streamable<T> asStreamable(final U iterator) {
-        return new Streamable<T>() {
-            @Override
-            public Iterator<T> iterator() {
-                return iterator;
-            }
-        };
+    public static<T> Stream<T> stream(Collection<T> source) {
+        return new LinearPipeline<>(new TraversableStreamAccessor<>(source, source.size()));
     }
 
-    public static<T, U extends Iterable<T> & Sized> SizedStreamable<T> concatenate(final U left, final U right) {
-        return new SizedStreamable<T>() {
-            int size = 0;
-
-            @Override
-            public int size() {
-                if (size == 0) {
-                    size = left.size() + right.size();
-                }
-                return size;
-            }
-
-            @Override
-            public Iterator<T> iterator() {
-                return concatenate(left.iterator(), right.iterator());
-            }
-        };
+    public static<T> Stream<T> stream(Traversable<T> source) {
+        return new LinearPipeline<>(new TraversableStreamAccessor<>(source));
     }
 
-    public static<T> Iterator<T> concatenate(final Iterator<? extends T> left, final Iterator<? extends T> right) {
-        return new Iterator<T>() {
-            private Iterator<? extends T> it = left;
-            private boolean switched = false;
+    public static<T> Stream<T> stream(Traversable<T> source, int knownSize) {
+        return new LinearPipeline<>(new TraversableStreamAccessor<>(source, knownSize));
+    }
 
-            @Override
-            public boolean hasNext() {
-                if (it.hasNext()) {
-                    return true;
-                } else if (switched) {
-                    return false;
-                } else {
-                    switched = true;
-                    it = right;
-                    return it.hasNext();
-                }
-            }
+    public static<T> Stream<T> stream(Iterable<T> source) {
+        return stream(source.iterator());
+    }
 
-            @Override
-            public T next() {
-                if (hasNext()) {
-                    return it.next();
-                } else {
-                    throw new NoSuchElementException();
-                }
-            }
-        };
+    public static<T> Stream<T> stream(Iterable<T> source, int knownSize) {
+        return stream(source.iterator(), knownSize);
     }
 
+    public static<T> Stream<T> stream(Iterator<T> source) {
+        return new LinearPipeline<>(new IteratorStreamAccessor<>(source));
+    }
+
+    public static<T> Stream<T> stream(Iterator<T> source, int knownSize) {
+        return new LinearPipeline<>(new IteratorStreamAccessor<>(source, knownSize));
+    }
+
+    public static<T> Stream<T> parallel(Spliterator<T> source) {
+        return new LinearPipeline<>(new SpliteratorStreamAccessor<>(source));
+    }
+
+    public static<T> Stream<T> parallel(Spliterator<T> source, int knownSize) {
+        return new LinearPipeline<>(new SpliteratorStreamAccessor<>(source, knownSize));
+    }
+
+    // @@@ Need from(StreamAccessor) methods
+
     public static<T> Spliterator<T> emptySpliterator() {
         return new Spliterator<T>() {
             @Override
@@ -126,50 +107,153 @@
         };
     }
 
-    public static<T, U extends Traversable<T> & Sized> Spliterator<T> spliterator(U source) {
-        return new SizedStreamableSpliterator<>(source);
-    }
+    private static class IteratorStreamAccessor<T>
+            extends AbstractSequentialStreamAccessor<T>
+            implements StreamAccessor<T>, Iterator<T> {
+        private final Iterator<T> it;
+        private final int size;
 
-    private static class SizedStreamableSpliterator<T, U extends Traversable<T> & Sized> implements Spliterator<T> {
-        private final U source;
-        private Iterator<T> iterator;
+        public IteratorStreamAccessor(Iterator<T> it) {
+            this.it = it;
+            this.size = -1;
+        }
 
-        private SizedStreamableSpliterator(U source) {
-            this.source = source;
+        public IteratorStreamAccessor(Iterator<T> it, int knownSize) {
+            this.it = it;
+            this.size = knownSize;
         }
 
         @Override
-        public Spliterator<T> split() {
-            return emptySpliterator();
+        public void forEach(Sink<? super T> sink) {
+            while (it.hasNext())
+                sink.accept(it.next());
         }
 
-        private Iterator<T> iterator() {
-            if (iterator == null) {
-                iterator = source.iterator();
-            }
+        @Override
+        public T next() {
+            return it.next();
+        }
+
+        @Override
+        public boolean hasNext() {
+            return it.hasNext();
+        }
+
+        @Override
+        public Iterator<T> iterator() {
+            return it;
+        }
+
+        @Override
+        public int getStreamFlags() {
+            int flags = super.getStreamFlags();
+            if (size >= 0)
+                flags |= Stream.STATE_SIZED;
+            return flags;
+        }
+
+        @Override
+        public int getSizeOrEstimate() {
+            return (size >= 0) ? size : super.getSizeOrEstimate();
+        }
+    }
+
+    static class SpliteratorStreamAccessor<T> implements StreamAccessor<T> {
+        private final Spliterator<T> spliterator;
+        private final int size;
+
+        public SpliteratorStreamAccessor(Spliterator<T> spliterator) {
+            this.spliterator = spliterator;
+            this.size = -1;
+        }
+
+        public SpliteratorStreamAccessor(Spliterator<T> spliterator, int knownSize) {
+            this.spliterator = spliterator;
+            this.size = knownSize;
+        }
+
+        @Override
+        public void forEach(Sink<? super T> sink) {
+            spliterator.forEach(sink);
+        }
+
+        @Override
+        public int getStreamFlags() {
+            int flags = 0;
+            if (size >= 0)
+                flags |= Stream.STATE_SIZED;
+            return flags;
+        }
+
+        @Override
+        public int getSizeOrEstimate() {
+            return (size >= 0) ? size : -Integer.MAX_VALUE;
+        }
+
+        @Override
+        public Iterator<T> iterator() {
+            return spliterator;
+        }
+
+        @Override
+        public Spliterator<T> spliterator() {
+            return spliterator;
+        }
+
+        @Override
+        public Stream.Shape getShape() {
+            return Stream.Shape.LINEAR;
+        }
+
+        @Override
+        public boolean isPredictableSplits() {
+            return spliterator.isPredictableSplits();
+        }
+
+        @Override
+        public boolean isParallel() {
+            return true;
+        }
+    }
+
+    private static class TraversableStreamAccessor<T>
+            extends AbstractSequentialStreamAccessor<T>
+            implements StreamAccessor<T>, Iterator<T> {
+        private final Traversable<T> traversable;
+        private final int size;
+        Iterator<T> iterator = null;
+
+        TraversableStreamAccessor(Traversable<T> traversable) {
+            this.traversable = traversable;
+            this.size = -1;
+        }
+
+        TraversableStreamAccessor(Traversable<T> traversable, int knownSize) {
+            this.traversable = traversable;
+            this.size = knownSize;
+        }
+
+        public Iterator<T> iterator() {
+            if (iterator == null)
+                iterator = traversable.iterator();
             return iterator;
         }
 
         @Override
         public void forEach(Sink<? super T> sink) {
             if (iterator == null) {
-                source.forEach(sink);
+                traversable.forEach(sink);
                 iterator = Collections.emptyIterator();
-            } else {
-                while (iterator.hasNext()) {
+            }
+            else {
+                while (iterator.hasNext())
                     sink.accept(iterator.next());
-                }
             }
         }
 
         @Override
-        public int getRemainingSizeIfKnown() {
-            return iterator == null ? source.size() : -1;
-        }
-
-        @Override
-        public boolean isPredictableSplits() {
-            return true;
+        public T next() {
+            return iterator().next();
         }
 
         @Override
@@ -178,39 +262,16 @@
         }
 
         @Override
-        public T next() {
-            return iterator().next();
+        public int getStreamFlags() {
+            int flags = super.getStreamFlags();
+            if (size >= 0)
+                flags |= Stream.STATE_SIZED;
+            return flags;
+        }
+
+        @Override
+        public int getSizeOrEstimate() {
+            return (size >= 0) ? size : super.getSizeOrEstimate();
         }
     }
-
-    public static boolean equals(Streamable<?> one, Streamable<?> two) {
-        Iterator<?> each = two.iterator();
-        for (Object first : one) {
-            if (!each.hasNext()) {
-                return false;
-            }
-
-            Object second = each.next();
-
-            if (!Objects.equals(first, second)) {
-                return false;
-            }
-        }
-
-        if (each.hasNext()) {
-            return false;
-        }
-
-        return true;
-    }
-
-    public static int hashCode(Streamable<?> streamable) {
-        int result = -1;
-
-        for(Object each : streamable) {
-            result = 17 * (result ^ Objects.hashCode(each));
-        }
-
-        return result;
-    }
 }
--- a/src/share/classes/java/util/streams/ops/AllMatchOp.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/AllMatchOp.java	Sat Aug 25 19:03:55 2012 -0400
@@ -27,14 +27,13 @@
 import java.util.Iterator;
 import java.util.Objects;
 import java.util.functions.Predicate;
-import java.util.streams.ParallelStreamable;
 
 /**
  * AllMatchOp
  *
  * @author Brian Goetz
  */
-public class AllMatchOp<T> implements ShortCircuitEagerOp<T, Boolean> {
+public class AllMatchOp<T> implements ShortCircuitTerminalOp<T, Boolean> {
     private final Predicate<? super T> predicate;
 
     public AllMatchOp(Predicate<? super T> predicate) {
@@ -57,7 +56,7 @@
     }
 
     @Override
-    public <V> Boolean computeParallel(ParallelStreamable<V> source, ParallelOpHelper<T, V> helper) {
+    public <V> Boolean computeParallel(ParallelOpHelper<T, V> helper) {
         throw new UnsupportedOperationException("nyi");
     }
 }
--- a/src/share/classes/java/util/streams/ops/AnyMatchOp.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/AnyMatchOp.java	Sat Aug 25 19:03:55 2012 -0400
@@ -27,7 +27,6 @@
 import java.util.Iterator;
 import java.util.Objects;
 import java.util.functions.Predicate;
-import java.util.streams.ParallelStreamable;
 
 /**
  * AnyMatchOp
@@ -36,7 +35,7 @@
  *
  * @author Brian Goetz
  */
-public class AnyMatchOp<T> implements ShortCircuitEagerOp<T, Boolean> {
+public class AnyMatchOp<T> implements ShortCircuitTerminalOp<T, Boolean> {
     private final Predicate<? super T> predicate;
 
     public AnyMatchOp(Predicate<? super T> predicate) {
@@ -59,7 +58,7 @@
     }
 
     @Override
-    public <V> Boolean computeParallel(ParallelStreamable<V> source, ParallelOpHelper<T, V> helper) {
+    public <V> Boolean computeParallel(ParallelOpHelper<T, V> helper) {
         throw new UnsupportedOperationException("nyi");
     }
 }
--- a/src/share/classes/java/util/streams/ops/BiAllMatchOp.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/BiAllMatchOp.java	Sat Aug 25 19:03:55 2012 -0400
@@ -29,13 +29,12 @@
 import java.util.Mapping;
 import java.util.Objects;
 import java.util.functions.BiPredicate;
-import java.util.streams.ParallelStreamable;
 
 /**
  * BiAllMatchOp
  *
  */
-public class BiAllMatchOp<K,V> implements ShortCircuitEagerOp<Mapping<K,V>, Boolean> {
+public class BiAllMatchOp<K,V> implements ShortCircuitTerminalOp<Mapping<K,V>, Boolean> {
     private final BiPredicate<? super K, ? super V> predicate;
 
     public BiAllMatchOp(BiPredicate<? super K, ? super V> predicate) {
--- a/src/share/classes/java/util/streams/ops/BiAnyMatchOp.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/BiAnyMatchOp.java	Sat Aug 25 19:03:55 2012 -0400
@@ -29,13 +29,12 @@
 import java.util.Mapping;
 import java.util.Objects;
 import java.util.functions.BiPredicate;
-import java.util.streams.ParallelStreamable;
 
 /**
  * BiAnyMatchOp
  *
  */
-public class BiAnyMatchOp<K,V> implements ShortCircuitEagerOp<Mapping<K,V>, Boolean> {
+public class BiAnyMatchOp<K,V> implements ShortCircuitTerminalOp<Mapping<K,V>, Boolean> {
     private final BiPredicate<? super K, ? super V> predicate;
 
     public BiAnyMatchOp(BiPredicate<? super K, ? super V> predicate) {
--- a/src/share/classes/java/util/streams/ops/BiFilterOp.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/BiFilterOp.java	Sat Aug 25 19:03:55 2012 -0400
@@ -47,7 +47,7 @@
     }
 
     @Override
-    public int getStreamState(int upstreamState) {
+    public int getStreamFlags(int upstreamState) {
         return upstreamState & ~(Stream.STATE_SIZED | Stream.STATE_UNKNOWN_MASK_V1);
     }
 
@@ -64,9 +64,9 @@
     @Override
     public Sink<Mapping<K,V>> sink(final Sink<? super Mapping<K,V>> sink) {
         if (!(sink instanceof BiSink)) {
-            throw new IllegalStateException("Expecting BiSink");
+            throw new IllegalArgumentException(String.format("Expected argument sink to be of type BiSync; found %s", sink.getClass()));
         }
-        final BiSink<K, V> biSink = (BiSink<K, V>) (Sink) Objects.requireNonNull(sink);
+        final BiSink<K, V> biSink = (BiSink<K, V>) (BiSink) Objects.requireNonNull(sink);
 
         return new BiSink<K,V>() {
             @Override
@@ -84,7 +84,7 @@
         };
     }
 
-    public static <K, V> MapIterator<K, V> iterator(Iterator<? super Mapping<K, V>> source, BiPredicate<? super K, ? super V> predicate) {
+    public static <K, V> MapIterator<K, V> iterator(Iterator<Mapping<K, V>> source, BiPredicate<? super K, ? super V> predicate) {
         return new FilteredIterator<>(source, predicate);
     }
 
@@ -94,7 +94,7 @@
 
         private boolean nextReady = false;
 
-        FilteredIterator(Iterator<? super Mapping<K, V>> source, BiPredicate<? super K, ? super V> predicate) {
+        FilteredIterator(Iterator<Mapping<K, V>> source, BiPredicate<? super K, ? super V> predicate) {
             super(source);
             this.predicate = Objects.requireNonNull(predicate);
         }
@@ -102,7 +102,7 @@
         @Override
         public boolean hasNext() {
             while (!nextReady && source.hasNext()) {
-                current = (Mapping<K, V>)source.next();
+                current = source.next();
                 nextReady = predicate.eval(current.getKey(), current.getValue());
             }
 
--- a/src/share/classes/java/util/streams/ops/BiMapOp.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/BiMapOp.java	Sat Aug 25 19:03:55 2012 -0400
@@ -52,7 +52,7 @@
     }
 
     @Override
-    public int getStreamState(int upstreamState) {
+    public int getStreamFlags(int upstreamState) {
         return upstreamState & ~(Stream.STATE_SORTED | Stream.STATE_UNIQUE | Stream.STATE_UNKNOWN_MASK_V1);
     }
 
@@ -67,7 +67,7 @@
     }
 
     @Override
-    public Sink<Mapping<K,V>> sink(final Sink<? super Mapping<K,U>> sink) {
+    public BiSink<K,V> sink(final Sink<? super Mapping<K,U>> sink) {
         if (!(sink instanceof BiSink)) {
             throw new IllegalStateException("Expecting BiSink");
         }
--- a/src/share/classes/java/util/streams/ops/CumulateOp.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/CumulateOp.java	Sat Aug 25 19:03:55 2012 -0400
@@ -46,7 +46,7 @@
     }
 
     @Override
-    public int getStreamState(int upstreamState) {
+    public int getStreamFlags(int upstreamState) {
         return upstreamState & ~(Stream.STATE_SORTED | Stream.STATE_UNIQUE | Stream.STATE_UNKNOWN_MASK_V1);
     }
 
@@ -110,25 +110,8 @@
     }
 
     @Override
-    public <V> ParallelStreamable<T> computeParallel(ParallelStreamable<V> source, ParallelOpHelper<T, V> helper) {
-        ForkJoinTask<Node<T>> task = new CumulateTask<>(helper.suggestDepth(source), source.spliterator(), op, helper);
-        final Node<T> node = helper.invoke(task);
-        return new ParallelStreamable<T>() {
-            @Override
-            public int size() {
-                return node.size();
-            }
-
-            @Override
-            public Iterator<T> iterator() {
-                return node.iterator();
-            }
-
-            @Override
-            public Spliterator<T> spliterator() {
-                return node.spliterator();
-            }
-        };
+    public <V> TreeUtils.Node<T> computeParallel(ParallelOpHelper<T, V> helper) {
+        return helper.invoke(new CumulateTask<>(helper.suggestDepth(), helper.spliterator(), op, helper));
     }
 
     private static class Problem<T,V> {
@@ -144,7 +127,7 @@
         }
     }
 
-    private class CumulateTask<V> extends RecursiveTask<Node<T>> {
+    private class CumulateTask<V> extends RecursiveTask<TreeUtils.Node<T>> {
         private final Problem<T,V> problem;
         private final int depth;
         private final Spliterator<V> source;
@@ -175,7 +158,7 @@
         }
 
         @Override
-        protected Node<T> compute() {
+        protected TreeUtils.Node<T> compute() {
             switch (problem.pass) {
                 case 0:
                     if (depth != 0) {
@@ -199,7 +182,7 @@
                         upward = sink.end();
                         // Special case -- if problem.depth == 0, just wrap the result and be done
                         if (isRoot())
-                            return new LeafNode<>(Arrays.asList((T[]) leafData.toArray()));
+                            return TreeUtils.node(leafData);
                     }
                     return null;
 
@@ -216,9 +199,9 @@
                             right.downward = problem.op.operate(downward, left.upward);
                         }
                         right.fork();
-                        Node<T> leftResult = left.compute();
-                        Node<T> rightResult = right.join();
-                        return new InternalNode<>(leftResult, rightResult);
+                        TreeUtils.Node<T> leftResult = left.compute();
+                        TreeUtils.Node<T> rightResult = right.join();
+                        return TreeUtils.node(leftResult, rightResult);
                     }
                     else {
                         // @@@ Pretty inefficient; should update in place when we have a better StreamBuilder representation
@@ -230,7 +213,7 @@
                             for (T t : leafData)
                                 result.accept(problem.op.operate(downward, t));
                         }
-                        return new LeafNode<>(result);
+                        return TreeUtils.node(result);
                     }
 
                 default:
@@ -238,132 +221,4 @@
             }
         }
     }
-
-    private static interface Node<T> extends Traversable<T>, Splittable<T>, Sized {
-    }
-
-    private static class LeafNode<T> implements Node<T> {
-        private final SizedStreamable<T> data;
-        int size = 0;
-
-        private LeafNode(SizedStreamable<T> data) {
-            this.data = data;
-        }
-
-        @Override
-        public int size() {
-            if (size == 0)
-                size = data.size();
-            return size;
-        }
-
-        @Override
-        public void forEach(Sink<? super T> sink) {
-            data.forEach(sink);
-        }
-
-        @Override
-        public Iterator<T> iterator() {
-            return data.iterator();
-        }
-
-        @Override
-        public Spliterator<T> spliterator() {
-            return Streams.spliterator(data);
-        }
-    }
-
-    private static class InternalNode<T> implements Node<T> {
-        private final Node<T> left, right;
-        int size = 0;
-
-        private InternalNode(Node<T> left, Node<T> right) {
-            this.left = left;
-            this.right = right;
-        }
-
-        @Override
-        public int size() {
-            if (size == 0)
-                size = left.size() + right.size();
-            return size;
-        }
-
-        @Override
-        public void forEach(Sink<? super T> sink) {
-            left.forEach(sink);
-            right.forEach(sink);
-        }
-
-        @Override
-        public Iterator<T> iterator() {
-            return Streams.concatenate(left.iterator(), right.iterator());
-        }
-
-        @Override
-        public Spliterator<T> spliterator() {
-            return new InternalNodeSpliterator<>(this);
-        }
-
-        private static class InternalNodeSpliterator<T> implements Spliterator<T> {
-            private Node<T> cur;
-            private Iterator<T> iterator;
-
-            private InternalNodeSpliterator(InternalNode<T> cur) {
-                this.cur = cur;
-            }
-
-            private Iterator<T> iterator() {
-                if (iterator == null)
-                    iterator = cur.iterator();
-                return iterator;
-            }
-
-            @Override
-            public Spliterator<T> split() {
-                if (iterator != null)
-                    throw new IllegalStateException();
-                if (cur instanceof InternalNode) {
-                    InternalNode<T> internalNode = (InternalNode<T>) cur;
-                    Spliterator<T> ret = internalNode.left.spliterator();
-                    cur = internalNode.right;
-                    return ret;
-                }
-                else
-                    return Streams.emptySpliterator();
-            }
-
-            @Override
-            public void forEach(Sink<? super T> sink) {
-                if (iterator == null) {
-                    cur.forEach(sink);
-                    iterator = Collections.emptyIterator();
-                }
-                else {
-                    while (iterator.hasNext())
-                        sink.accept(iterator.next());
-                }
-            }
-
-            @Override
-            public int getRemainingSizeIfKnown() {
-                return (iterator == null) ? cur.size() : -1;
-            }
-
-            @Override
-            public boolean isPredictableSplits() {
-                return true;
-            }
-
-            @Override
-            public boolean hasNext() {
-                return iterator().hasNext();
-            }
-
-            @Override
-            public T next() {
-                return iterator().next();
-            }
-        }
-    }
 }
--- a/src/share/classes/java/util/streams/ops/EagerOp.java	Mon Aug 20 16:10:37 2012 -0400
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,50 +0,0 @@
-/*
- * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.  Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-package java.util.streams.ops;
-
-import java.util.Spliterator;
-import java.util.functions.Sink;
-import java.util.streams.ParallelStreamable;
-import java.util.streams.StatefulSink;
-
-/**
- * EagerOp
- *
- * @author Brian Goetz
- */
-public interface EagerOp<T, U> extends ParallelOp<T, U> {
-    public StatefulSink<T, U> sink();
-
-    @Override
-    <V> U computeParallel(ParallelStreamable<V> source, ParallelOpHelper<T, V> helper) default {
-        // dumb default serial version
-        StatefulSink<T, U> sSink = sink();
-        Sink<V> vSink = helper.sink(sSink);
-        Spliterator<V> spliterator = source.spliterator();
-        sSink.begin(spliterator.getRemainingSizeIfKnown());
-        spliterator.forEach(vSink);
-        return sSink.end();
-    }
-}
--- a/src/share/classes/java/util/streams/ops/ElementwiseOp.java	Mon Aug 20 16:10:37 2012 -0400
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,75 +0,0 @@
-/*
- * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.  Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-package java.util.streams.ops;
-
-import java.util.Iterator;
-import java.util.functions.Sink;
-
-/**
- * An operation performed upon elements.
- *
- * @param <T> Type of input elements to the operation.
- * @param <U> Type of output elements to the operation.
- *
- * @author Brian Goetz
- */
-public interface ElementwiseOp<T,U> {
-
-    /**
-     * Return the resultant state of a stream utilizing this operation
-     * considering the state of the provided stream.
-     *
-     * @param upstreamState stream state of the upstream stream.
-     * @return the resultant state of a stream utilizing this operation
-     * considering the state of the provided stream.
-     */
-    int getStreamState(int upstreamState);
-
-    /**
-     * If {@code true} then operation is stateful and accumulates state.
-     *
-     * @return {@code true} then operation is stateful and accumulates state.
-     */
-    boolean isStateful();
-
-    /**
-     * Return an iterator of the elements of the stream. The operation will be
-     * performed upon each element as it is returned by {@code Iterator.next()}.
-     *
-     * @param in the source stream.
-     * @return an iterator of the elements of the stream.
-     */
-    Iterator<U> iterator(Iterator<T> in);
-
-    /**
-     * Return a sink which will accept elements, perform the operation upon
-     * each element and send it to the provided sink.
-     *
-     * @param sink elements will be sent to this sink after the processing.
-     * @return a sink which will accept elements and perform the operation upon
-     * each element.
-     */
-    Sink<T> sink(Sink<? super U> sink);
-}
--- a/src/share/classes/java/util/streams/ops/FilterOp.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/FilterOp.java	Sat Aug 25 19:03:55 2012 -0400
@@ -45,7 +45,7 @@
     }
 
     @Override
-    public int getStreamState(int upstreamState) {
+    public int getStreamFlags(int upstreamState) {
         return upstreamState & ~(Stream.STATE_SIZED | Stream.STATE_UNKNOWN_MASK_V1);
     }
 
--- a/src/share/classes/java/util/streams/ops/FindAnyOp.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/FindAnyOp.java	Sat Aug 25 19:03:55 2012 -0400
@@ -26,14 +26,20 @@
 
 import java.util.Iterator;
 import java.util.Optional;
-import java.util.streams.ParallelStreamable;
 
 /**
  * FindAnyOp
  *
  * @author Brian Goetz
  */
-public class FindAnyOp<T> implements ShortCircuitEagerOp<T, Optional<T>> {
+public class FindAnyOp<T> implements ShortCircuitTerminalOp<T, Optional<T>> {
+    private final static FindAnyOp<?> INSTANCE = new FindAnyOp<>();
+
+    @SuppressWarnings("unchecked")
+    public static <T> FindAnyOp<T> singleton() {
+        return (FindAnyOp<T>) INSTANCE;
+    }
+
     @Override
     public Optional<T> evaluate(Iterator<T> iterator) {
         return iterator.hasNext() ? new Optional<>(iterator.next()) : Optional.<T>empty();
--- a/src/share/classes/java/util/streams/ops/FindFirstOp.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/FindFirstOp.java	Sat Aug 25 19:03:55 2012 -0400
@@ -26,14 +26,20 @@
 
 import java.util.Iterator;
 import java.util.Optional;
-import java.util.streams.ParallelStreamable;
 
 /**
  * FindFirstOp
  *
  * @author Brian Goetz
  */
-public class FindFirstOp<T> implements ShortCircuitEagerOp<T, Optional<T>> {
+public class FindFirstOp<T> implements ShortCircuitTerminalOp<T, Optional<T>> {
+    private final static FindFirstOp<?> INSTANCE = new FindFirstOp<>();
+
+    @SuppressWarnings("unchecked")
+    public static <T> FindFirstOp<T> singleton() {
+        return (FindFirstOp<T>) INSTANCE;
+    }
+
     @Override
     public Optional<T> evaluate(Iterator<T> iterator) {
         return iterator.hasNext() ? new Optional<>(iterator.next()) : Optional.<T>empty();
--- a/src/share/classes/java/util/streams/ops/FlatMapOp.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/FlatMapOp.java	Sat Aug 25 19:03:55 2012 -0400
@@ -47,7 +47,7 @@
     }
 
     @Override
-    public int getStreamState(int upstreamState) {
+    public int getStreamFlags(int upstreamState) {
         return upstreamState & ~(Stream.STATE_SORTED | Stream.STATE_UNIQUE | Stream.STATE_SIZED | Stream.STATE_UNKNOWN_MASK_V1);
     }
 
--- a/src/share/classes/java/util/streams/ops/FoldOp.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/FoldOp.java	Sat Aug 25 19:03:55 2012 -0400
@@ -27,7 +27,6 @@
 import java.util.Spliterator;
 import java.util.concurrent.RecursiveTask;
 import java.util.functions.*;
-import java.util.streams.ParallelStreamable;
 import java.util.streams.StatefulSink;
 
 /**
@@ -35,7 +34,7 @@
  *
  * @author Brian Goetz
  */
-public class FoldOp<T, U> implements EagerOp<T, U> {
+public class FoldOp<T, U> implements TerminalOp<T, U> {
     private final Factory<U> seedFactory;
     private final Combiner<U, T, U> reducer;
     private final BinaryOperator<U> combiner;
@@ -79,8 +78,8 @@
     }
 
     @Override
-    public <V> U computeParallel(ParallelStreamable<V> source, ParallelOpHelper<T, V> helper) {
-        return helper.invoke(new ReduceTask<>(helper.suggestDepth(source), source.spliterator(), this, helper));
+    public <V> U computeParallel(ParallelOpHelper<T, V> helper) {
+        return helper.invoke(new ReduceTask<>(helper.suggestDepth(), helper.spliterator(), this, helper));
     }
 
     private static class ReduceTask<T, U, V> extends RecursiveTask<U> {
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/share/classes/java/util/streams/ops/ForEachOp.java	Sat Aug 25 19:03:55 2012 -0400
@@ -0,0 +1,86 @@
+/*
+ * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package java.util.streams.ops;
+
+import java.util.Spliterator;
+import java.util.concurrent.RecursiveAction;
+import java.util.functions.Sink;
+import java.util.streams.StatefulSink;
+
+/**
+ * ForEachOp
+ *
+ * @author Brian Goetz
+ */
+public class ForEachOp<T> implements TerminalOp<T,Void> {
+    private final Sink<? super T> sink;
+
+    public ForEachOp(Sink<? super T> sink) {
+        this.sink = sink;
+    }
+
+    @Override
+    public StatefulSink<T, Void> sink() {
+        return (StatefulSink<T, Void>) sink.asStatefulSink();
+    }
+
+    @Override
+    public <V> Void computeParallel(ParallelOpHelper<T, V> helper) {
+        int depth = helper.suggestDepth();
+        Sink<V> compoundSink = helper.sink(sink);
+        Spliterator<V> spliterator = helper.spliterator();
+        if (depth == 0) {
+            spliterator.forEach(compoundSink);
+        } else {
+            helper.invoke(new ForEachTask<>(depth, spliterator, compoundSink));
+        }
+        return null;
+    }
+
+    private static class ForEachTask<T> extends RecursiveAction {
+        private final int depth;
+        private final Spliterator<T> spliterator;
+        private final Sink<T> sink;
+
+        private ForEachTask(int depth, Spliterator<T> spliterator, Sink<T> sink) {
+            this.depth = depth;
+            this.spliterator = spliterator;
+            this.sink = sink;
+        }
+
+        @Override
+        protected void compute() {
+            if (depth == 0) {
+                spliterator.forEach(sink);
+            } else {
+                ForEachTask<T> left = new ForEachTask<>(depth - 1, spliterator.split(), sink);
+                ForEachTask<T> right = new ForEachTask<>(depth - 1, spliterator, sink);
+                right.fork();
+                left.compute();
+                right.join();
+            }
+        }
+    }
+}
--- a/src/share/classes/java/util/streams/ops/GroupByOp.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/GroupByOp.java	Sat Aug 25 19:03:55 2012 -0400
@@ -37,7 +37,7 @@
  *
  * @author Brian Goetz
  */
-public class GroupByOp<T, K> implements EagerOp<T, Map<K, SizedStreamable<T>>> {
+public class GroupByOp<T, K> implements TerminalOp<T, Map<K, Streamable<T>>> {
     private final Mapper<? super T, ? extends K> mapper;
 
     public GroupByOp(Mapper<? super T, ? extends K> mapper) {
@@ -45,8 +45,8 @@
     }
 
     @Override
-    public StatefulSink<T, Map<K, SizedStreamable<T>>> sink() {
-        return new StatefulSink<T, Map<K, SizedStreamable<T>>>() {
+    public StatefulSink<T, Map<K, Streamable<T>>> sink() {
+        return new StatefulSink<T, Map<K, Streamable<T>>>() {
             private final Map<K, StreamBuilder<T>> map = new HashMap<>();
 
             @Override
@@ -55,7 +55,7 @@
             }
 
             @Override
-            public Map<K, SizedStreamable<T>> end() {
+            public Map<K, Streamable<T>> end() {
                 return (Map) map;
             }
 
--- a/src/share/classes/java/util/streams/ops/IdOp.java	Mon Aug 20 16:10:37 2012 -0400
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,53 +0,0 @@
-/*
- * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.  Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-package java.util.streams.ops;
-
-import java.util.Iterator;
-import java.util.functions.Sink;
-import java.util.streams.Stream;
-
-/**
- * IdOp
- *
- * @author Brian Goetz
- */
-public class IdOp<T> implements StatelessOp<T,T> {
-    @Override
-    public int getStreamState(int upstreamState) {
-        return upstreamState & ~Stream.STATE_UNKNOWN_MASK_V1;
-    }
-
-    @Override
-    public Iterator<T> iterator(Iterator<T> in) {
-        return in;
-    }
-
-    @Override
-    public Sink<T> sink(final Sink<? super T> sink) {
-        return (Sink<T>) sink;
-    }
-
-    // No need to support fused operations; cheaper to just call iterator/sink
-}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/share/classes/java/util/streams/ops/IntermediateOp.java	Sat Aug 25 19:03:55 2012 -0400
@@ -0,0 +1,86 @@
+/*
+ * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package java.util.streams.ops;
+
+import java.util.Iterator;
+import java.util.functions.Sink;
+import java.util.streams.Stream;
+
+/**
+ * An operation performed upon elements.
+ *
+ * @param <T> Type of input elements to the operation.
+ * @param <U> Type of output elements to the operation.
+ *
+ * @author Brian Goetz
+ */
+public interface IntermediateOp<T,U> {
+
+    /**
+     * Return the resultant state of a stream utilizing this operation
+     * considering the state of the provided stream.
+     *
+     * @param upstreamState stream state of the upstream stream.
+     * @return the resultant state of a stream utilizing this operation
+     * considering the state of the provided stream.
+     */
+    int getStreamFlags(int upstreamState);
+
+    /**
+     * If {@code true} then operation is stateful and accumulates state.
+     *
+     * @return {@code true} then operation is stateful and accumulates state.
+     */
+    boolean isStateful();
+
+    /**
+     * If {@code true}, then evaluation of the pipeline is constrained to be
+     * pull-oriented.  This is true for operations that may truncate or otherwise
+     * manipulate the stream contents.
+     */
+    boolean isShortCircuit() default { return false; }
+
+    /**
+     * Return an iterator of the elements of the stream. The operation will be
+     * performed upon each element as it is returned by {@code Iterator.next()}.
+     *
+     * @param in the source stream.
+     * @return an iterator of the elements of the stream.
+     */
+    Iterator<U> iterator(Iterator<T> in);
+
+    /**
+     * Return a sink which will accept elements, perform the operation upon
+     * each element and send it to the provided sink.
+     *
+     * @param sink elements will be sent to this sink after the processing.
+     * @return a sink which will accept elements and perform the operation upon
+     * each element.
+     */
+    Sink<T> sink(Sink<? super U> sink);
+
+    Stream.Shape inputShape() default { return Stream.Shape.LINEAR; }
+    Stream.Shape outputShape() default { return Stream.Shape.LINEAR; }
+}
--- a/src/share/classes/java/util/streams/ops/MapExtractKeysOp.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/MapExtractKeysOp.java	Sat Aug 25 19:03:55 2012 -0400
@@ -37,7 +37,7 @@
  */
 public class MapExtractKeysOp<K,V> implements StatelessOp<Mapping<K,V>, K> {
     @Override
-    public int getStreamState(int upstreamState) {
+    public int getStreamFlags(int upstreamState) {
         return upstreamState & ~Stream.STATE_UNKNOWN_MASK_V1;
     }
 
--- a/src/share/classes/java/util/streams/ops/MapExtractValuesOp.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/MapExtractValuesOp.java	Sat Aug 25 19:03:55 2012 -0400
@@ -37,7 +37,7 @@
  */
 public class MapExtractValuesOp<K,V> implements StatelessOp<Mapping<K,V>, V> {
     @Override
-    public int getStreamState(int upstreamState) {
+    public int getStreamFlags(int upstreamState) {
         return upstreamState & ~Stream.STATE_UNKNOWN_MASK_V1;
     }
 
--- a/src/share/classes/java/util/streams/ops/MapFilterKeysOp.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/MapFilterKeysOp.java	Sat Aug 25 19:03:55 2012 -0400
@@ -45,7 +45,7 @@
     }
 
     @Override
-    public int getStreamState(int upstreamState) {
+    public int getStreamFlags(int upstreamState) {
         return upstreamState & ~(Stream.STATE_SIZED | Stream.STATE_UNKNOWN_MASK_V1);
     }
 
--- a/src/share/classes/java/util/streams/ops/MapFilterValuesOp.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/MapFilterValuesOp.java	Sat Aug 25 19:03:55 2012 -0400
@@ -45,7 +45,7 @@
     }
 
     @Override
-    public int getStreamState(int upstreamState) {
+    public int getStreamFlags(int upstreamState) {
         return upstreamState & ~(Stream.STATE_SIZED | Stream.STATE_UNKNOWN_MASK_V1);
     }
 
--- a/src/share/classes/java/util/streams/ops/MapMapValuesOp.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/MapMapValuesOp.java	Sat Aug 25 19:03:55 2012 -0400
@@ -49,7 +49,7 @@
     }
 
     @Override
-    public int getStreamState(int upstreamState) {
+    public int getStreamFlags(int upstreamState) {
         return upstreamState & ~(Stream.STATE_UNKNOWN_MASK_V1);
     }
 
--- a/src/share/classes/java/util/streams/ops/MapOp.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/MapOp.java	Sat Aug 25 19:03:55 2012 -0400
@@ -43,7 +43,7 @@
     }
 
     @Override
-    public int getStreamState(int upstreamState) {
+    public int getStreamFlags(int upstreamState) {
         return upstreamState & ~(Stream.STATE_SORTED | Stream.STATE_UNIQUE | Stream.STATE_UNKNOWN_MASK_V1);
     }
 
--- a/src/share/classes/java/util/streams/ops/MapSortedOp.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/MapSortedOp.java	Sat Aug 25 19:03:55 2012 -0400
@@ -26,7 +26,6 @@
 
 import java.util.*;
 import java.util.functions.Sink;
-import java.util.streams.ParallelStreamable;
 import java.util.streams.StatefulSink;
 import java.util.streams.Stream;
 
@@ -64,7 +63,7 @@
     }
 
     @Override
-    public int getStreamState(int upstreamState) {
+    public int getStreamFlags(int upstreamState) {
         return upstreamState | Stream.STATE_SORTED;
     }
 
@@ -101,7 +100,7 @@
     private static <K,V> MapIterator<K,V> iterator(Iterator<Mapping<K, V>> iterator, Comparator<? super Mapping<K,V>> comparator) {
         Objects.requireNonNull(iterator);
         Objects.requireNonNull(comparator);
-        final PriorityQueue<? super Mapping<K,V>> pq = new PriorityQueue<>(DEFAULT_PRIORITY_QUEUE_SIZE, comparator);
+        final PriorityQueue<Mapping<K,V>> pq = new PriorityQueue<>(DEFAULT_PRIORITY_QUEUE_SIZE, comparator);
         while (iterator.hasNext()) {
             pq.add(iterator.next());
         }
--- a/src/share/classes/java/util/streams/ops/MapSwapOp.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/MapSwapOp.java	Sat Aug 25 19:03:55 2012 -0400
@@ -47,7 +47,7 @@
     }
 
     @Override
-    public int getStreamState(int upstreamState) {
+    public int getStreamFlags(int upstreamState) {
         return upstreamState & ~(Stream.STATE_UNIQUE | Stream.STATE_SORTED | Stream.STATE_UNKNOWN_MASK_V1);
     }
 
--- a/src/share/classes/java/util/streams/ops/MappedOp.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/MappedOp.java	Sat Aug 25 19:03:55 2012 -0400
@@ -46,7 +46,7 @@
     }
 
     @Override
-    public int getStreamState(int upstreamState) {
+    public int getStreamFlags(int upstreamState) {
         return upstreamState & ~Stream.STATE_UNKNOWN_MASK_V1;
     }
 
--- a/src/share/classes/java/util/streams/ops/NoneMatchOp.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/NoneMatchOp.java	Sat Aug 25 19:03:55 2012 -0400
@@ -27,14 +27,13 @@
 import java.util.Iterator;
 import java.util.Objects;
 import java.util.functions.Predicate;
-import java.util.streams.ParallelStreamable;
 
 /**
  * NoneMatchOp
  *
  * @author Brian Goetz
  */
-public class NoneMatchOp<T> implements ShortCircuitEagerOp<T, Boolean> {
+public class NoneMatchOp<T> implements ShortCircuitTerminalOp<T, Boolean> {
     private final Predicate<? super T> predicate;
 
     public NoneMatchOp(Predicate<? super T> predicate) {
@@ -58,7 +57,7 @@
     }
 
     @Override
-    public <V> Boolean computeParallel(ParallelStreamable<V> source, ParallelOpHelper<T, V> helper) {
+    public <V> Boolean computeParallel(ParallelOpHelper<T, V> helper) {
         throw new UnsupportedOperationException("nyi");
     }
 }
--- a/src/share/classes/java/util/streams/ops/ParallelOp.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/ParallelOp.java	Sat Aug 25 19:03:55 2012 -0400
@@ -25,9 +25,9 @@
 package java.util.streams.ops;
 
 import java.util.Iterator;
+import java.util.Spliterator;
 import java.util.concurrent.ForkJoinTask;
 import java.util.functions.Sink;
-import java.util.streams.ParallelStreamable;
 
 /**
  * ParallelOp
@@ -38,9 +38,11 @@
  * @author Brian Goetz
  */
 public interface ParallelOp<T, U> {
+
     public interface ParallelOpHelper<T, V> {
-        public int suggestDepth(ParallelStreamable<V> source);
-        public Sink<V> sink(Sink<T> sink);
+        public int suggestDepth();
+        public Spliterator<V> spliterator();
+        public Sink<V> sink(Sink<? super T> sink);
         public Iterator<T> iterator();
         public<Z> Z invoke(ForkJoinTask<Z> task);
     }
@@ -49,9 +51,8 @@
      * Compute the result of the operation in parallel and return the result.
      *
      * @param <V>
-     * @param source element source
      * @param helper
      * @return result of the operation.
      */
-    <V> U computeParallel(ParallelStreamable<V> source, ParallelOpHelper<T, V> helper);
+    <V> U computeParallel(ParallelOpHelper<T, V> helper);
 }
--- a/src/share/classes/java/util/streams/ops/ShortCircuitEagerOp.java	Mon Aug 20 16:10:37 2012 -0400
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,54 +0,0 @@
-/*
- * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.  Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-package java.util.streams.ops;
-
-import java.util.Iterator;
-import java.util.streams.*;
-
-/**
- * ShortCircuitEagerOp
- *
- * @param <T> Type of elements.
- * @param <U> Type of result.
- *
- * @author Brian Goetz
- */
-public interface ShortCircuitEagerOp<T, U> extends ParallelOp<T, U> {
-    /**
-     * Evaluate some portion of the elements and return result. The
-     * implementation need not traverse all elements of the iterator.
-     *
-     * @param iterator sequence of elements to be evaluated.
-     * @return
-     */
-    public U evaluate(Iterator<T> iterator);
-
-    @Override
-    <V> U computeParallel(ParallelStreamable<V> source, ParallelOpHelper<T, V> helper) default {
-        // dumb default serial version
-        System.out.println("Using ShortCircuitEagerOp.computeParallel serial default");
-        return evaluate(helper.iterator());
-    }
-}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/share/classes/java/util/streams/ops/ShortCircuitTerminalOp.java	Sat Aug 25 19:03:55 2012 -0400
@@ -0,0 +1,60 @@
+/*
+ * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package java.util.streams.ops;
+
+import java.util.Iterator;
+import java.util.streams.*;
+
+/**
+ * ShortCircuitEagerOp
+ *
+ * @param <T> Type of elements.
+ * @param <U> Type of result.
+ *
+ * @author Brian Goetz
+ */
+public interface ShortCircuitTerminalOp<T, U> extends TerminalOp<T, U> {
+    /**
+     * Evaluate some portion of the elements and return result. The
+     * implementation need not traverse all elements of the iterator.
+     *
+     * @param iterator sequence of elements to be evaluated.
+     * @return
+     */
+    public U evaluate(Iterator<T> iterator);
+
+    @Override
+    StatefulSink<T, U> sink() default { throw new UnsupportedOperationException(); }
+
+    @Override
+    boolean isShortCircuit() default { return true; }
+
+    @Override
+    <V> U computeParallel(ParallelOpHelper<T, V> helper) default {
+        // dumb default serial version
+        System.out.println("Using ShortCircuitEagerOp.computeParallel serial default");
+        return evaluate(helper.iterator());
+    }
+}
--- a/src/share/classes/java/util/streams/ops/SortedOp.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/SortedOp.java	Sat Aug 25 19:03:55 2012 -0400
@@ -114,7 +114,7 @@
     }
 
     @Override
-    public int getStreamState(int upstreamState) {
+    public int getStreamFlags(int upstreamState) {
         return upstreamState | Stream.STATE_SORTED;
     }
 }
--- a/src/share/classes/java/util/streams/ops/StatefulOp.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/StatefulOp.java	Sat Aug 25 19:03:55 2012 -0400
@@ -12,7 +12,7 @@
  * @param <V> Type of terminal state.
  *
  */
-public interface StatefulOp<T, U, V> extends ElementwiseOp<T,U>, ParallelOp<T, ParallelStreamable<U>> {
+public interface StatefulOp<T, U, V> extends IntermediateOp<T,U>, ParallelOp<T, TreeUtils.Node<U>> {
 
     @Override
     public abstract StatefulSink<T, V> sink(Sink<? super U> sink);
@@ -23,31 +23,15 @@
     }
 
     @Override
-    <Z> ParallelStreamable<U> computeParallel(ParallelStreamable<Z> source,
-                                              ParallelOpHelper<T, Z> helper) default {
+    <Z> TreeUtils.Node<U> computeParallel(ParallelOpHelper<T, Z> helper) default {
         // dumb default serial implementation
         final StreamBuilder<U> sb = StreamBuilders.make();
         StatefulSink<T, V> sSink = sink(sb);
         Sink<Z> sink = helper.sink(sSink);
-        Spliterator<Z> spliterator = source.spliterator();
+        Spliterator<Z> spliterator = helper.spliterator();
         sSink.begin(spliterator.getRemainingSizeIfKnown());
         spliterator.forEach(sink);
         sSink.end();
-        return new ParallelStreamable<U>() {
-            @Override
-            public int size() {
-                return sb.size();
-            }
-
-            @Override
-            public Iterator<U> iterator() {
-                return sb.iterator();
-            }
-
-            @Override
-            public Spliterator<U> spliterator() {
-                return Streams.spliterator(sb);
-            }
-        };
+        return TreeUtils.node(sb);
     }
 }
--- a/src/share/classes/java/util/streams/ops/StatelessOp.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/StatelessOp.java	Sat Aug 25 19:03:55 2012 -0400
@@ -3,7 +3,7 @@
 /**
  * StreamOp
  */
-public interface StatelessOp<T, U> extends ElementwiseOp<T, U> {
+public interface StatelessOp<T, U> extends IntermediateOp<T, U> {
     @Override
     public boolean isStateful() default {
         return false;
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/share/classes/java/util/streams/ops/TerminalOp.java	Sat Aug 25 19:03:55 2012 -0400
@@ -0,0 +1,71 @@
+/*
+ * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package java.util.streams.ops;
+
+import java.util.Iterator;
+import java.util.Spliterator;
+import java.util.functions.Sink;
+import java.util.streams.StatefulSink;
+
+/**
+ * EagerOp
+ *
+ * @author Brian Goetz
+ */
+public interface TerminalOp<T, U> extends ParallelOp<T, U> {
+    /**
+     * Evaluate some portion of the elements and return result. The
+     * implementation need not traverse all elements of the iterator.
+     *
+     * @param iterator sequence of elements to be evaluated.
+     * @return
+     */
+    public U evaluate(Iterator<T> iterator) default {
+        StatefulSink<T,U> sink = sink();
+        sink.begin(-1);
+        while (iterator.hasNext())
+            sink.accept(iterator.next());
+        return sink.end();
+    }
+
+    public StatefulSink<T, U> sink();
+
+    public boolean isShortCircuit() default { return false; }
+
+    @Override
+    <V> U computeParallel(ParallelOpHelper<T, V> helper) default {
+        // dumb default serial version
+        StatefulSink<T, U> sSink = sink();
+        Sink<V> vSink = helper.sink(sSink);
+        Spliterator<V> spliterator = helper.spliterator();
+        sSink.begin(spliterator.getRemainingSizeIfKnown());
+        // @@@ This is ridiculous!  Integrate Sink/StatefulSink functionality in a less hacky way
+        if (sSink == vSink)
+            spliterator.forEach(vSink);
+        else
+            TreeUtils.forEach(spliterator, vSink);
+        return sSink.end();
+    }
+}
--- a/src/share/classes/java/util/streams/ops/ToArrayOp.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/ToArrayOp.java	Sat Aug 25 19:03:55 2012 -0400
@@ -25,7 +25,6 @@
 package java.util.streams.ops;
 
 import java.util.Arrays;
-import java.util.streams.ParallelStreamable;
 import java.util.streams.StatefulSink;
 
 /**
@@ -33,14 +32,16 @@
  * <p/>
  * @author Brian Goetz
  */
-public class ToArrayOp<T> implements EagerOp<T, Object[]> {
+public class ToArrayOp<T> implements TerminalOp<T, Object[]> {
 
     private final static ToArrayOp<?> INSTANCE = new ToArrayOp<>();
 
+    @SuppressWarnings("unchecked")
     public static <T> ToArrayOp<T> singleton() {
         return (ToArrayOp<T>) INSTANCE;
     }
 
+
     @Override
     public StatefulSink<T, Object[]> sink() {
         return new StatefulSink<T, Object[]>() {
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/share/classes/java/util/streams/ops/TreeUtils.java	Sat Aug 25 19:03:55 2012 -0400
@@ -0,0 +1,438 @@
+/*
+ * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package java.util.streams.ops;
+
+import java.util.*;
+import java.util.concurrent.RecursiveAction;
+import java.util.concurrent.RecursiveTask;
+import java.util.functions.Sink;
+import java.util.streams.*;
+
+/**
+ * Collector
+ *
+ * @author Brian Goetz
+ */
+public class TreeUtils {
+    // @@@ This method is a hack, in that helper.sink() might serve up a stateful sink.
+    // This logic should be more tightly reflected in the helper rather than expecting every client to
+    // do instanceof and casts.
+    static<T> void forEach(Spliterator<T> spliterator, Sink<? super T> sink) {
+        if (sink instanceof StatefulSink) {
+            StatefulSink<T, ?> sSink = (StatefulSink<T, ?>) sink;
+            sSink.begin(spliterator.getRemainingSizeIfKnown());
+            spliterator.forEach(sSink);
+            sSink.end();
+        }
+        else {
+            spliterator.forEach(sink);
+        }
+    }
+
+    public static<T, U> Node<T> collect(ParallelOp.ParallelOpHelper<T, U> helper,
+                                       boolean flattenLeaves,
+                                       boolean flattenTree) {
+        int depth = helper.suggestDepth();
+        Spliterator<U> spliterator = helper.spliterator();
+        int size = spliterator.getRemainingSizeIfKnown();
+        boolean splitSizesKnown = false;
+        if (depth == 0) {
+            StreamBuilder<T> builder;
+            // @@@ splitSizesKnown should come from helper
+            // Need to account for SIZED flag from pipeline
+            if (size != -1 && splitSizesKnown) {
+                builder = StreamBuilders.makeFixed(size);
+                forEach(spliterator, helper.sink(builder));
+                return node((T[]) builder.toArray());
+            }
+            else {
+                builder = StreamBuilders.make();
+                forEach(spliterator, helper.sink(builder));
+                return node(builder);
+            }
+        }
+        else {
+            // @@@ splitSizesKnown should come from helper
+            // Need to account for SIZED flag from pipeline
+            if (size != -1 && spliterator.isPredictableSplits() && splitSizesKnown) {
+                T[] array = (T[]) new Object[size];
+                helper.invoke(new SizedCollectorTask<>(depth, spliterator, helper, array, 0, size));
+                return node(array);
+            }
+            else {
+                Node<T> node = helper.invoke(new CollectorTask<>(depth, spliterator, helper));
+                if (flattenTree) {
+                    T[] array = (T[]) new Object[node.size()];
+                    helper.invoke(new ToArrayTask<>(node, array, 0));
+                    return node(array);
+                }
+                return node;
+            }
+        }
+    }
+
+    public static class CollectorOp<T> implements TerminalOp<T, TreeUtils.Node<T>> {
+        @Override
+        public StatefulSink<T, TreeUtils.Node<T>> sink() {
+            throw new UnsupportedOperationException();
+        }
+
+        private static CollectorOp<?> INSTANCE = new CollectorOp<>();
+
+        public static <T> CollectorOp<T> singleton() {
+            return (CollectorOp<T>) INSTANCE;
+        }
+
+        @Override
+        public <V> TreeUtils.Node<T> computeParallel(ParallelOpHelper<T, V> helper) {
+            return TreeUtils.collect(helper, false, false);
+        }
+    }
+
+    private static class CollectorTask<T, U> extends RecursiveTask<Node<U>> {
+        private final int depth;
+        private final Spliterator<T> spliterator;
+        private ParallelOp.ParallelOpHelper<U, T> helper;
+
+        public CollectorTask(int depth, Spliterator<T> spliterator, ParallelOp.ParallelOpHelper<U, T> helper) {
+            this.depth = depth;
+            this.spliterator = spliterator;
+            this.helper = helper;
+        }
+
+        @Override
+        protected Node<U> compute() {
+            if (depth == 0) {
+                // @@@ Usual comment about using fixed stream builders if we know enough to do so
+                StreamBuilder<U> builder = StreamBuilders.make();
+                spliterator.forEach(helper.sink(builder));
+                return node(builder);
+            }
+            else {
+                CollectorTask<T, U> left = new CollectorTask<>(depth-1, spliterator.split(), helper);
+                CollectorTask<T, U> right = new CollectorTask<>(depth-1, spliterator, helper);
+                right.fork();
+                Node<U> leftResult = left.compute();
+                Node<U> rightResult = right.join();
+                return node(leftResult, rightResult);
+            }
+        }
+    }
+
+    private static class SizedCollectorTask<T, U> extends RecursiveAction {
+        private final int depth;
+        private final Spliterator<T> spliterator;
+        private ParallelOp.ParallelOpHelper<U, T> helper;
+        private final U[] array;
+        private int offset;
+        private int length;
+
+        private SizedCollectorTask(int depth, Spliterator<T> spliterator, ParallelOp.ParallelOpHelper<U, T> helper, U[] array, int offset, int length) {
+            this.depth = depth;
+            this.spliterator = spliterator;
+            this.helper = helper;
+            this.array = array;
+            this.offset = offset;
+            this.length = length;
+        }
+
+        @Override
+        protected void compute() {
+            if (depth == 0) {
+                spliterator.forEach(helper.sink(Arrays.sink(array, offset, length)));
+            }
+            else {
+                Spliterator<T> split = spliterator.split();
+                int leftSize = split.getRemainingSizeIfKnown();
+                assert(leftSize != -1);
+                SizedCollectorTask<T, U> left = new SizedCollectorTask<>(depth-1, split, helper, array, offset, leftSize);
+                SizedCollectorTask<T, U> right = new SizedCollectorTask<>(depth-1, spliterator, helper, array, offset+leftSize, length-leftSize);
+                right.fork();
+                left.compute();
+                right.join();
+            }
+        }
+    }
+
+    private static class ToArrayTask<T> extends RecursiveAction {
+        private final T[] array;
+        private final Node<T> node;
+        private final int offset;
+
+        private ToArrayTask(Node<T> node, T[] array, int offset) {
+            this.array = array;
+            this.node = node;
+            this.offset = offset;
+        }
+
+        @Override
+        protected void compute() {
+            if (node instanceof InternalNode) {
+                InternalNode<T> n = (InternalNode<T>) node;
+                Node<T> left = n.left();
+                ToArrayTask<T> leftTask = new ToArrayTask<>(left, array, offset);
+                ToArrayTask<T> rightTask = new ToArrayTask<>(n.right(), array, offset + left.size());
+                rightTask.fork();
+                leftTask.compute();
+                rightTask.join();
+            }
+            else {
+                node.copyTo(array, offset);
+            }
+        }
+    }
+
+    public static interface Node<T> extends Traversable<T>, Splittable<T>, Sized {
+        void copyTo(T[] array, int offset);
+    }
+
+    public static interface InternalNode<T> extends Node<T> {
+        Node<T> left();
+        Node<T> right();
+    }
+
+    public static<T> Node<T> node(final T[] array) {
+        return new Node<T>() {
+            private final T[] data = array;
+
+            @Override
+            public int size() {
+                return data.length;
+            }
+
+            @Override
+            public void forEach(Sink<? super T> sink) {
+                for (T t : data)
+                    sink.accept(t);
+            }
+
+            @Override
+            public Iterator<T> iterator() {
+                return Arrays.iterator(data, 0, data.length);
+            }
+
+            @Override
+            public Spliterator<T> spliterator() {
+                return Arrays.spliterator(data, 0, data.length);
+            }
+
+            @Override
+            public void copyTo(T[] dest, int destOffset) {
+                System.arraycopy(data, 0, dest, destOffset, data.length);
+            }
+
+            @Override
+            public String toString() {
+                return "ArrayNode" + Arrays.toString(data);
+            }
+        };
+    }
+
+    public static<T> Node<T> node(final StreamBuilder<T> stream) {
+        return new Node<T>() {
+            private final StreamBuilder<T> data = stream;
+
+            @Override
+            public int size() {
+                return data.size();
+            }
+
+            @Override
+            public void forEach(Sink<? super T> sink) {
+                data.forEach(sink);
+            }
+
+            @Override
+            public Iterator<T> iterator() {
+                return data.iterator();
+            }
+
+            @Override
+            public Spliterator<T> spliterator() {
+                return Arrays.spliterator((T[]) data.toArray());
+            }
+
+            @Override
+            public void copyTo(T[] array, int offset) {
+                int i = 0;
+                for (T t : this)
+                    array[offset+(i++)] = t;
+            }
+
+            @Override
+            public String toString() {
+                return "SBNode" + data.toString();
+            }
+        };
+    }
+
+    public static<T> Node<T> node(Node<T> left, Node<T> right) {
+        return new InternalNodeImpl<>(left, right);
+    }
+
+    private static class InternalNodeImpl<T> implements InternalNode<T> {
+        private final Node<T> left, right;
+        int size = 0;
+
+        private InternalNodeImpl(Node<T> left, Node<T> right) {
+            this.left = left;
+            this.right = right;
+        }
+
+        @Override
+        public Node<T> left() {
+            return left;
+        }
+
+        @Override
+        public Node<T> right() {
+            return right;
+        }
+
+        @Override
+        public int size() {
+            if (size == 0)
+                size = left.size() + right.size();
+            return size;
+        }
+
+        @Override
+        public void forEach(Sink<? super T> sink) {
+            left.forEach(sink);
+            right.forEach(sink);
+        }
+
+        @Override
+        public Iterator<T> iterator() {
+            return concatenate(left.iterator(), right.iterator());
+        }
+
+        @Override
+        public Spliterator<T> spliterator() {
+            return new InternalNodeSpliterator<>(this);
+        }
+
+        @Override
+        public void copyTo(T[] array, int offset) {
+            left.copyTo(array, offset);
+            right.copyTo(array, offset+left.size());
+        }
+
+        @Override
+        public String toString() {
+            return String.format("IntNode[%s,%s]", left.toString(), right.toString());
+        }
+
+        private static class InternalNodeSpliterator<T> implements Spliterator<T> {
+            private Node<T> cur;
+            private Iterator<T> iterator;
+
+            private InternalNodeSpliterator(InternalNodeImpl<T> cur) {
+                this.cur = cur;
+            }
+
+            private Iterator<T> iterator() {
+                if (iterator == null)
+                    iterator = cur.iterator();
+                return iterator;
+            }
+
+            @Override
+            public Spliterator<T> split() {
+                if (iterator != null)
+                    throw new IllegalStateException();
+                if (cur instanceof InternalNodeImpl) {
+                    InternalNodeImpl<T> internalNode = (InternalNodeImpl<T>) cur;
+                    Spliterator<T> ret = internalNode.left.spliterator();
+                    cur = internalNode.right;
+                    return ret;
+                }
+                else
+                    return Streams.emptySpliterator();
+            }
+
+            @Override
+            public void forEach(Sink<? super T> sink) {
+                if (iterator == null) {
+                    cur.forEach(sink);
+                    iterator = Collections.emptyIterator();
+                }
+                else {
+                    while (iterator.hasNext())
+                        sink.accept(iterator.next());
+                }
+            }
+
+            @Override
+            public int getRemainingSizeIfKnown() {
+                return (iterator == null) ? cur.size() : -1;
+            }
+
+            @Override
+            public boolean isPredictableSplits() {
+                return true;
+            }
+
+            @Override
+            public boolean hasNext() {
+                return iterator().hasNext();
+            }
+
+            @Override
+            public T next() {
+                return iterator().next();
+            }
+        }
+    }
+
+    public static<T> Iterator<T> concatenate(final Iterator<? extends T> left, final Iterator<? extends T> right) {
+        return new Iterator<T>() {
+            private Iterator<? extends T> it = left;
+            private boolean switched = false;
+
+            @Override
+            public boolean hasNext() {
+                if (it.hasNext()) {
+                    return true;
+                } else if (switched) {
+                    return false;
+                } else {
+                    switched = true;
+                    it = right;
+                    return it.hasNext();
+                }
+            }
+
+            @Override
+            public T next() {
+                if (hasNext()) {
+                    return it.next();
+                } else {
+                    throw new NoSuchElementException();
+                }
+            }
+        };
+    }
+}
--- a/src/share/classes/java/util/streams/ops/UniqOp.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/UniqOp.java	Sat Aug 25 19:03:55 2012 -0400
@@ -26,7 +26,6 @@
 
 import java.util.*;
 import java.util.functions.Sink;
-import java.util.streams.ParallelStreamable;
 import java.util.streams.StatefulSink;
 import java.util.streams.Stream;
 
@@ -48,7 +47,7 @@
     }
 
     @Override
-    public int getStreamState(int upstreamState) {
+    public int getStreamFlags(int upstreamState) {
         // If the upstream is sorted, we need only cache last element
         // If the upstream is unique, this is a no-op
         return upstreamState & ~(Stream.STATE_SIZED | Stream.STATE_UNKNOWN_MASK_V1) | Stream.STATE_UNIQUE;
--- a/test-ng/build.xml	Mon Aug 20 16:10:37 2012 -0400
+++ b/test-ng/build.xml	Sat Aug 25 19:03:55 2012 -0400
@@ -37,7 +37,7 @@
 
     <target name="test" depends="test-compile" >
         <echo>Results at: file:${test.reports.dir}/index.html</echo>
-        <testng classpathref="test.class.path" outputdir="${test.reports.dir}">
+        <testng classpathref="test.class.path" outputdir="${test.reports.dir}" >
             <classfileset dir="${test.classes.dir}" includes="**/${test.pattern}.class"/>
             <jvmarg value="-ea" />
             <jvmarg value="-esa" />
--- a/test-ng/tests/org/openjdk/tests/java/lang/ThreadLocalFactoryTest.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/test-ng/tests/org/openjdk/tests/java/lang/ThreadLocalFactoryTest.java	Sat Aug 25 19:03:55 2012 -0400
@@ -77,7 +77,7 @@
     public ThreadLocalFactoryTest() {
     }
 
-    public void test() throws InterruptedException, Exception {
+    public void test() throws Exception {
         int threadCount = 500;
         Thread th[] = new Thread[threadCount];
         final boolean visited[] = new boolean[threadCount];
@@ -87,11 +87,11 @@
             th[i] = new Thread() {
                 @Override
                 public void run() {
-                    int threadId = ((Integer) (threadLocal.get())).intValue();
+                    int threadId = threadLocal.get();
                     assertFalse(visited[threadId], "visited[" + threadId + "]=" + visited[threadId]);
                     visited[threadId] = true;
                     // check the get() again
-                    int secondCheckThreadId = ((Integer) (threadLocal.get())).intValue();
+                    int secondCheckThreadId = threadLocal.get();
                     assertEquals( secondCheckThreadId, threadId );
                     Thread.yield();
                 }
--- a/test-ng/tests/org/openjdk/tests/java/lang/ThreadLocalTest.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/test-ng/tests/org/openjdk/tests/java/lang/ThreadLocalTest.java	Sat Aug 25 19:03:55 2012 -0400
@@ -43,7 +43,7 @@
      * Test of get method, of class ThreadLocal.
      */
     public void testGet() {
-        ThreadLocal instance = new ThreadLocal();
+        ThreadLocal<?> instance = new ThreadLocal<>();
         Object expResult = null;
         Object result = instance.get();
         assertEquals(result, expResult);
@@ -54,7 +54,7 @@
      */
     public void testSet() {
         String initialValue = "initial value";
-        ThreadLocal instance = new ThreadLocal();
+        ThreadLocal<String> instance = new ThreadLocal<>();
         instance.set(initialValue);
         assertEquals( instance.get(), initialValue );
     }
@@ -64,7 +64,7 @@
      */
     public void testRemove() {
         String putThisIn = "value was set";
-        ThreadLocal instance = new ThreadLocal();
+        ThreadLocal<String> instance = new ThreadLocal<>();
         instance.set( putThisIn );
         instance.remove();
         assertNull( instance.get() );
@@ -75,7 +75,7 @@
      */
     public void testInitWithFactory() {
         String whatDoYouExpect = "OneWithEverything";
-        ThreadLocal<String> hotdogForTheMonk = new ThreadLocal( new StringFactory( whatDoYouExpect ));
+        ThreadLocal<String> hotdogForTheMonk = new ThreadLocal<>(new StringFactory( whatDoYouExpect ));
         assertEquals( hotdogForTheMonk.get(), hotdogForTheMonk.get() );
     }
 
--- a/test-ng/tests/org/openjdk/tests/java/util/FillableStringTest.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/test-ng/tests/org/openjdk/tests/java/util/FillableStringTest.java	Sat Aug 25 19:03:55 2012 -0400
@@ -37,7 +37,8 @@
 public class FillableStringTest {
     public Stream<String> generate() {
         return Arrays.asList("one", "two", "three", "four", "five", "six")
-            .filter(s -> s.length() > 3)
+            .stream()
+            .filter(s->s.length() > 3)
             .map(s -> s.toUpperCase());
     }
 
--- a/test-ng/tests/org/openjdk/tests/java/util/LambdaTestHelpers.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/test-ng/tests/org/openjdk/tests/java/util/LambdaTestHelpers.java	Sat Aug 25 19:03:55 2012 -0400
@@ -26,7 +26,6 @@
 
 import java.util.*;
 import java.util.functions.*;
-import java.util.streams.ParallelStreamable;
 import java.util.streams.StatefulSink;
 
 import static org.testng.Assert.assertEquals;
@@ -106,15 +105,15 @@
         return list;
     }
 
-    public static void assertCountSum(Iterable<Integer> it, int count, int sum) {
+    public static void assertCountSum(Iterable<? super Integer> it, int count, int sum) {
         assertCountSum(it.iterator(), count, sum);
     }
 
-    public static void assertCountSum(Iterator<Integer> it, int count, int sum) {
+    public static void assertCountSum(Iterator<? super Integer> it, int count, int sum) {
         int c = 0;
         int s = 0;
         while (it.hasNext()) {
-            int i = it.next();
+            int i = (Integer) it.next();
             c++;
             s += i;
         }
@@ -211,24 +210,6 @@
         assertContents(one, two);
     }
 
-    static<T> void assertContents(ParallelStreamable<T> pi, Iterable<T> list) {
-        Iterator<T> pI = pi.sequential().iterator();
-        Iterator<T> lI = list.iterator();
-
-        while (lI.hasNext()) {
-            assertTrue(pI.hasNext());
-            T pT = pI.next();
-            T lT = lI.next();
-            assertEquals(pT, lT);
-        }
-        assertTrue(!pI.hasNext());
-
-        for (int depth=0; depth<6; depth++) {
-            Iterable<Iterable<T>> splits = split(pi, depth);
-            assertSplitContents(splits, list);
-        }
-    }
-
     static <T> void assertSplitContents(Iterable<Iterable<T>> splits, Iterable<T> list) {
         Iterator<Iterable<T>> mI = splits.iterator();
         Iterator<T> pI = null;
@@ -261,10 +242,6 @@
         }
     }
 
-    static <T> Iterable<Iterable<T>> split(ParallelStreamable<T> pi, int depth) {
-        return splitHelper(pi.spliterator(), depth, new ArrayList<Iterable<T>>());
-    }
-
     private static <T> Iterable<Iterable<T>> splitHelper(Spliterator<T> s, int depth, List<Iterable<T>> iterables) {
         if (depth == 0) {
             List<T> list = new ArrayList<>();
--- a/test-ng/tests/org/openjdk/tests/java/util/StringJoinerTest.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/test-ng/tests/org/openjdk/tests/java/util/StringJoinerTest.java	Sat Aug 25 19:03:55 2012 -0400
@@ -334,10 +334,9 @@
         list.add(TWO);
         list.add(THREE);
 
-        StringJoiner sj = list.into(new StringJoiner(",", "{", "}"));
-        Iterable<Character> ic = sj.asChars();
+        StringJoiner sj = list.stream().into(new StringJoiner(",", "{", "}"));
+        Iterator<Character> it = sj.asChars().iterator();
         String result = new String();
-        Iterator<Character> it = ic.iterator();
         while (it.hasNext()) {
             result += it.next();
         }
@@ -350,13 +349,19 @@
         sj.add(ONE);
         sj.add(TWO);
 
-        Iterable<Integer> codePoints = sj.asCodePoints();
-        long count = Iterables.count(codePoints);
+        Iterator<Integer> codePoints = sj.asCodePoints().iterator();
+        long count=0;
+        while (codePoints.hasNext()) {
+            codePoints.next();
+            ++count;
+        }
         String expected = "{"+ONE+"-"+TWO+"}";
         assertEquals(count, expected.length(),
                 "Number of codePoints in result is not correct");
         int i = 0;
-        for (Integer cp : codePoints) {
+        codePoints = sj.asCodePoints().iterator();
+        while (codePoints.hasNext()) {
+            Integer cp = codePoints.next();
             assertEquals(cp.intValue(), expected.codePointAt(i), "Code point at i=" + i);
             i++;
         }
--- a/test-ng/tests/org/openjdk/tests/java/util/concurrent/AtomicReferenceTest.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/test-ng/tests/org/openjdk/tests/java/util/concurrent/AtomicReferenceTest.java	Sat Aug 25 19:03:55 2012 -0400
@@ -45,24 +45,17 @@
      * Test of updateAndGet method, of class AtomicReference.
      */
     public void testUpdateAndGet() {
-        System.out.println("getAndUpdate");
-        UnaryOperator<Integer> op = (x -> x + 2);
-        AtomicReference instance = new AtomicReference(new Integer(3));
-        Object expResult = 4;
-        Object result = instance.updateAndGet( op );
-        assertEquals(result, expResult);
+        AtomicReference<Integer> instance = new AtomicReference<>(3);
+        int result = instance.updateAndGet(x -> x + 2);
+        assertEquals(result, 5);
     }
 
     /**
      * Test of getAndUpdate method, of class AtomicReference.
      */
     public void testGetAndUpdate() {
-        System.out.println("getAndUpdate");
-        UnaryOperator<Integer> op = (x -> x + 3);
-        AtomicReference instance = new AtomicReference(new Integer(3));
-        Object expResult = 3;
-        Object result = instance.getAndUpdate( op );
-        assertEquals(result, expResult);
+        AtomicReference<Integer> instance = new AtomicReference<>(3);
+        int result = instance.getAndUpdate(x -> x + 3);
+        assertEquals(result, 3);
     }
-
 }
--- a/test-ng/tests/org/openjdk/tests/java/util/functions/UnaryOperatorTest.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/test-ng/tests/org/openjdk/tests/java/util/functions/UnaryOperatorTest.java	Sat Aug 25 19:03:55 2012 -0400
@@ -48,13 +48,13 @@
      */
     public void testOperate() {
         Integer operand = 3;
-        UnaryOperator instance = new IncByTwo();
+        UnaryOperator<Object> instance = new IncByTwo();
         Integer expResult = 5;
-        Integer result = (Integer)instance.operate(operand);
+        Integer result = (Integer) instance.operate(operand);
         assertEquals(result, expResult);
     }
 
-    public class IncByTwo implements UnaryOperator {
+    public class IncByTwo implements UnaryOperator<Object> {
 
         @Override
         public Object operate(Object operand) {
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/StreamTestDataProvider.java	Sat Aug 25 19:03:55 2012 -0400
@@ -0,0 +1,81 @@
+/*
+ * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package org.openjdk.tests.java.util.streams;
+
+import org.openjdk.tests.java.util.LambdaTestHelpers;
+import org.testng.annotations.DataProvider;
+
+/**
+ * StreamTestDataProvider
+ *
+ * @author Brian Goetz
+ */
+public class StreamTestDataProvider {
+    private static final Integer[] to0 = new Integer[0];
+    private static final Integer[] to1 = new Integer[1];
+    private static final Integer[] to10 = new Integer[10];
+    private static final Integer[] to100 = new Integer[100];
+    private static final Integer[] to1000 = new Integer[1000];
+    private static final Integer[] reversed = new Integer[100];
+    private static final Integer[] ones = new Integer[100];
+    private static final Integer[] twice = new Integer[200];
+    private static final Integer[] pseudoRandom;
+
+    static {
+        Integer[][] arrays = {to0, to1, to10, to100, to1000};
+        for (Integer[] arr : arrays) {
+            for (int i = 0; i < arr.length; i++) {
+                arr[i] = i;
+            }
+        }
+        for (int i = 0; i < reversed.length; i++) {
+            reversed[i] = reversed.length - i;
+        }
+        for (int i = 0; i < ones.length; i++) {
+            ones[i] = 1;
+        }
+        System.arraycopy(to100, 0, twice, 0, to100.length);
+        System.arraycopy(to100, 0, twice, to100.length, to100.length);
+        pseudoRandom = new Integer[LambdaTestHelpers.LONG_STRING.length()];
+        for (int i = 0; i < LambdaTestHelpers.LONG_STRING.length(); i++) {
+            pseudoRandom[i] = (int) LambdaTestHelpers.LONG_STRING.charAt(i);
+        }
+    }
+
+    // Return an array of Integer[]
+    @DataProvider(name = "opArrays")
+    public static Object[][] makeOpArrays() {
+        return new Object[][]{
+                {"array to zero", to0},
+                {"array to one", to1},
+                {"array to ten", to10},
+                {"array to 100", to100},
+                {"array to 1000", to1000},
+                {"array 100 ones", ones},
+                {"array to 100 twice", twice},
+                {"array reversed", reversed},
+                {"array pseudorandom", pseudoRandom}};
+    }
+}
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/CumulateOpTest.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/CumulateOpTest.java	Sat Aug 25 19:03:55 2012 -0400
@@ -24,6 +24,7 @@
  */
 package org.openjdk.tests.java.util.streams.ops;
 
+import org.openjdk.tests.java.util.streams.StreamTestDataProvider;
 import org.testng.annotations.Test;
 
 import java.util.streams.ops.CumulateOp;
@@ -36,6 +37,7 @@
  *
  * @author Brian Goetz
  */
+@Test
 public class CumulateOpTest extends StreamOpTestCase {
     public void testRawIterator() {
         assertCountSum(CumulateOp.iterator(countTo(0).iterator(), rPlus), 0, 0);
@@ -44,7 +46,7 @@
         assertContents(CumulateOp.iterator(countTo(5).iterator(), rMin), 1, 1, 1, 1, 1);
     }
 
-    @Test(dataProvider = "opArrays")
+    @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
     public void testOps(String name, Integer[] data) {
         assertConsistentOpBehavior(data, new CumulateOp<>(rPlus), l -> l);
         assertConsistentOpBehavior(data, new CumulateOp<>(rMin), l -> l);
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/FilterOpTest.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/FilterOpTest.java	Sat Aug 25 19:03:55 2012 -0400
@@ -24,6 +24,7 @@
  */
 package org.openjdk.tests.java.util.streams.ops;
 
+import org.openjdk.tests.java.util.streams.StreamTestDataProvider;
 import org.testng.annotations.Test;
 
 import java.util.streams.ops.FilterOp;
@@ -46,7 +47,7 @@
         assertCountSum(FilterOp.iterator(FilterOp.iterator(countTo(10).iterator(), pEven), pOdd), 0, 0);
     }
 
-    @Test(dataProvider = "opArrays")
+    @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
     public void testOps(String name, Integer[] data) {
         assertConsistentOpBehavior(data, new FilterOp<>(pTrue), l -> l);
         assertConsistentOpBehavior(data, new FilterOp<>(pFalse), l -> 0);
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/FlatMapOpTest.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/FlatMapOpTest.java	Sat Aug 25 19:03:55 2012 -0400
@@ -24,12 +24,13 @@
  */
 package org.openjdk.tests.java.util.streams.ops;
 
+import org.openjdk.tests.java.util.streams.StreamTestDataProvider;
 import org.testng.annotations.Test;
 
 import java.util.Arrays;
 import java.util.functions.FlatMapper;
 import java.util.functions.Sink;
-import java.util.streams.Streamable;
+import java.util.streams.Stream;
 import java.util.streams.ops.FlatMapOp;
 
 import static org.openjdk.tests.java.util.LambdaTestHelpers.*;
@@ -39,6 +40,7 @@
  *
  * @author Brian Goetz
  */
+@Test
 public class FlatMapOpTest extends StreamOpTestCase {
     private static final FlatMapper<Integer, Integer> mfIntToBits = new FlatMapper<Integer, Integer>() {
         @Override
@@ -55,8 +57,8 @@
 
     public void testFlatMap() {
         String[] stringsArray = {"hello", "there", "", "yada"};
-        Streamable<String> strings = Arrays.asList(stringsArray);
-        assertConcat(strings.flatMap(flattenChars), "hellothereyada");
+        Stream<String> strings = Arrays.asList(stringsArray).stream();
+        assertConcat(strings.flatMap(flattenChars).iterator(), "hellothereyada");
 
         assertCountSum(FlatMapOp.iterator(countTo(10).iterator(), mfId), 10, 55);
         assertCountSum(FlatMapOp.iterator(countTo(10).iterator(), mfNull), 0, 0);
@@ -66,7 +68,7 @@
         assertConsistentOpBehavior(new String[] { LONG_STRING }, new FlatMapOp<>(flattenChars), null);
     }
 
-    @Test(dataProvider = "opArrays")
+    @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
     public void testOps(String name, Integer[] data) {
         assertConsistentOpBehavior(data, new FlatMapOp<>(mfId), l -> l);
         assertConsistentOpBehavior(data, new FlatMapOp<>(mfNull), l -> 0);
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/GroupByOpTest.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/GroupByOpTest.java	Sat Aug 25 19:03:55 2012 -0400
@@ -24,24 +24,18 @@
  */
 package org.openjdk.tests.java.util.streams.ops;
 
-import java.util.Arrays;
-import java.util.List;
+import org.openjdk.tests.java.util.streams.StreamTestDataProvider;
+import org.testng.annotations.Test;
+
+import java.util.Collection;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.functions.Mappers;
-import java.util.streams.SizedStreamable;
+import java.util.streams.Stream;
 import java.util.streams.Streamable;
-import org.testng.annotations.Test;
-
-import java.util.streams.ops.FilterOp;
 import java.util.streams.ops.GroupByOp;
-import java.util.streams.ops.MapOp;
 
 import static org.openjdk.tests.java.util.LambdaTestHelpers.*;
-import static org.openjdk.tests.java.util.LambdaTestHelpers.pEven;
-import static org.openjdk.tests.java.util.LambdaTestHelpers.pOdd;
-
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.assertEquals;
 
 /**
@@ -54,15 +48,22 @@
     public void testRawIterator() {
         GroupByOp<Integer, Boolean> grouping = new GroupByOp<>(Mappers.forPredicate(pEven, true, false));
 
-        Map<Boolean,SizedStreamable<Integer>> result = iteratorToStatefulSink(countTo(10).iterator(), grouping.sink());
+        Map<Boolean,Streamable<Integer>> result = iteratorToStatefulSink(countTo(10).iterator(), grouping.sink());
 
         assertEquals(2, result.keySet().size());
-        for(SizedStreamable<Integer> group : result.values()) {
-            assertEquals(5, group.size());
+        for(Streamable<Integer> group : result.values()) {
+            int count = 0;
+            Stream<Integer> stream = group.stream();
+            Iterator<Integer> it = stream.iterator();
+            while (it.hasNext()) {
+                it.next();
+                ++count;
+            }
+            assertEquals(5, count);
         }
     }
 
-    @Test(dataProvider = "opArrays")
+    @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
     public void testOps(String name, Integer[] data) {
         assertConsistentOpBehavior(data, new GroupByOp<>(mId));
         assertConsistentOpBehavior(data, new GroupByOp<>(mZero));
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/MapOpTest.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/MapOpTest.java	Sat Aug 25 19:03:55 2012 -0400
@@ -24,6 +24,7 @@
  */
 package org.openjdk.tests.java.util.streams.ops;
 
+import org.openjdk.tests.java.util.streams.StreamTestDataProvider;
 import org.testng.annotations.Test;
 
 import java.util.streams.ops.FilterOp;
@@ -48,7 +49,7 @@
         assertCountSum(MapOp.iterator(MapOp.iterator(countTo(10).iterator(), mDoubler), mDoubler), 10, 220);
     }
 
-    @Test(dataProvider = "opArrays")
+    @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
     public void testOps(String name, Integer[] data) {
         assertConsistentOpBehavior(data, new MapOp<>(mId), l -> l);
         assertConsistentOpBehavior(data, new MapOp<>(mZero), l -> l);
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/SortedOpTest.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/SortedOpTest.java	Sat Aug 25 19:03:55 2012 -0400
@@ -24,12 +24,10 @@
  */
 package org.openjdk.tests.java.util.streams.ops;
 
+import org.openjdk.tests.java.util.streams.StreamTestDataProvider;
 import org.testng.annotations.Test;
 
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.streams.SequentialPipeline;
 import java.util.streams.ops.SortedOp;
 
 import static org.openjdk.tests.java.util.LambdaTestHelpers.*;
@@ -47,10 +45,10 @@
         assertCountSum(SortedOp.iterator(countTo(10).iterator(), cInteger.reverse()), 10, 55);
     }
 
-    @Test(dataProvider = "opArrays")
+    @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
     public void testOps(String name, Integer[] data) {
         SortedOp<Integer> op = new SortedOp<>(cInteger);
-        assertSorted(new SequentialPipeline<>(Arrays.asList(data), op).into(new ArrayList<Integer>()).iterator());
+        assertSorted(seq(data, op).into(new ArrayList<Integer>()).iterator());
         assertConsistentOpBehavior(data, op, l->l);
         assertConsistentOpBehavior(data, new SortedOp<>(cInteger.reverse()), l -> l);
     }
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/StreamOpTestCase.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/StreamOpTestCase.java	Sat Aug 25 19:03:55 2012 -0400
@@ -30,6 +30,10 @@
 import org.testng.annotations.Test;
 
 import java.util.*;
+import java.util.concurrent.ForkJoinTask;
+import java.util.concurrent.ForkJoinUtils;
+import java.util.functions.BiPredicate;
+import java.util.functions.Factory;
 import java.util.functions.Mapper;
 import java.util.functions.Sink;
 import java.util.streams.*;
@@ -47,186 +51,205 @@
 @Test
 public abstract class StreamOpTestCase {
 
+    protected<T> Stream<T> seq(T[] data) {
+        return Streams.stream(Arrays.iterator(data), data.length);
+    }
+
+    protected<T> Stream<T> par(T[] data) {
+        return Streams.parallel(Arrays.spliterator(data), data.length);
+    }
+
+    protected<T, U> Stream<U> seq(T[] data, IntermediateOp<T,U> op) {
+        return ((LinearPipeline) Streams.stream(Arrays.iterator(data), data.length)).pipeline(op);
+    }
+
+    protected<T, U> Stream<T> par(T[] data, IntermediateOp<T,U> op) {
+        return ((LinearPipeline) Streams.parallel(Arrays.spliterator(data), data.length)).pipeline(op);
+    }
+
+    protected<T, U> U seq(T[] data, TerminalOp<T,U> op) {
+        return ((LinearPipeline<?, T>) Streams.stream(Arrays.iterator(data), data.length)).pipeline(op);
+    }
+
+    protected<T, U> U par(T[] data, TerminalOp<T,U> op) {
+        return ((LinearPipeline<?, T>) Streams.parallel(Arrays.spliterator(data), data.length)).pipeline(op);
+    }
+
+    protected<T> Iterator<T> iterator(T[] data) {
+        return Arrays.iterator(data);
+    }
+
+    protected<T> void forEach(T[] data, Sink<? super T> sink) {
+        for (T t : data)
+            sink.accept(t);
+    }
+
     protected <T, U> void assertConsistentOpBehavior(T[] data,
-            ElementwiseOp<T, U> op,
-            Mapper<Integer, Integer> lengthMapper) {
-        Streamable<T> seq = Arrays.asList(data);
-        ParallelStreamable<T> par = Arrays.parallel(data);
-        int length = data.length;
-        if (lengthMapper != null) {
-            length = lengthMapper.map(data.length);
-        }
+                                                     IntermediateOp<T, U> op,
+                                                     Mapper<Integer, Integer> lengthMapper) {
 
-        ArraySink<U> sink1 = new ArraySink<>(length);
-        ArraySink<U> sink2 = new ArraySink<>(length);
-        ArraySink<U> sink3 = new ArraySink<>(length);
-        ArraySink<U> sink4 = new ArraySink<>(length);
-        ArraySink<U> sink5 = new ArraySink<>(length);
-        ArraySink<U> sink6 = new ArraySink<>(length);
+        int length = (lengthMapper == null) ? 100 : lengthMapper.map(data.length);
+
+        ArraySink<U> refResult = new ArraySink<>(length);
 
         // First pass -- grab an iterator and wrap it
-        Iterator<U> it = op.iterator(seq.iterator());
+        Iterator<U> it = op.iterator(iterator(data));
         while (it.hasNext()) {
-            sink1.accept(it.next());
+            refResult.accept(it.next());
         }
 
         // If a length mapper has been specified, make sure it is right
         if (lengthMapper != null) {
-            Assert.assertEquals(sink1.size(), (int)lengthMapper.map(data.length));
+            Assert.assertEquals(refResult.size(), (int)lengthMapper.map(data.length));
         }
 
         // Second pass -- create a sink and wrap it
         {
+            ArraySink<U> sink2 = new ArraySink<>(length);
             Sink<T> wrapped = op.sink(sink2);
             if (wrapped instanceof StatefulSink) {
                 StatefulSink<T, ?> stateful = (StatefulSink<T, ?>)wrapped;
                 stateful.begin(-1);
-                seq.forEach(stateful);
+                forEach(data, stateful);
                 stateful.end();
             } else {
-                seq.forEach(wrapped);
+                forEach(data, wrapped);
             }
-            assertEquals(sink1, sink2);
+            assertEquals(refResult, sink2);
         }
 
         // Third pass -- wrap with SequentialPipeline.op, and iterate in push mode
-        new SequentialPipeline<>(seq, op).forEach(sink3);
-        assertEquals(sink1, sink3);
+        {
+            ArraySink<U> sink3 = new ArraySink<>(length);
+            seq(data, op).forEach(sink3);
+            assertEquals(refResult, sink3);
+        }
 
         // Wrap with SequentialPipeline.op, and iterate in pull mode
-        Iterator<U> seqIter = new SequentialPipeline<>(seq, op);
-        while (seqIter.hasNext()) {
-            sink4.accept(seqIter.next());
+        {
+            ArraySink<U> sink4 = new ArraySink<>(length);
+            Iterator<U> seqIter = seq(data, op).iterator();
+            while (seqIter.hasNext()) {
+                sink4.accept(seqIter.next());
+            }
+            assertEquals(refResult, sink4);
         }
-        assertEquals(sink1, sink4);
 
         // Wrap with SequentialPipeline.op, and iterate in mixed mode
-        Stream<U> mixedIter = new SequentialPipeline<>(seq, op);
-        if (mixedIter.hasNext()) {
-            sink5.accept(mixedIter.next());
+        {
+            ArraySink<U> sink5 = new ArraySink<>(length);
+            Stream<U> stream = seq(data, op);
+            Iterator<U> iter = stream.iterator();
+            if (iter.hasNext()) {
+                sink5.accept(iter.next());
+            }
+            stream.forEach(sink5);
+            assertEquals(refResult, sink5);
         }
-        mixedIter.forEach(sink5);
-        assertEquals(sink1, sink5);
+
+        // For stateful ops, validate Node contents with fake ParallelHelper
+        {
+            if (op.isStateful()) {
+                StatefulOp<T, U, ?> sop = (StatefulOp<T, U, ?>) op;
+                ArraySink<U> sink6a = new ArraySink<>(length);
+                TreeUtils.Node<U> result = sop.computeParallel(new DummyParallelOpHelper<>(data));
+                for (U u : result)
+                    sink6a.accept(u);
+                assertEquals(refResult, sink6a);
+            }
+        }
 
         // Wrap with ParallelPipeline.op.sequential
-        if (op instanceof StatelessOp) {
-            new ParallelPipeline<>(par, (StatelessOp<T, U>)op).sequential().forEach(sink6);
-        } else if (op instanceof StatefulOp) {
-            SizedStreamable<U> sequential = ParallelPipeline.wrap(par).pipeline((StatefulOp<T, U, ?>)op).sequential();
-            sequential.forEach(sink6);
-        } else {
-            fail("Op neither stateful nor stateless");
+        {
+            ArraySink<U> sink6 = new ArraySink<>(length);
+            ((LinearPipeline) par(data, op)).sequential().forEach(sink6);
+            assertEquals(refResult, sink6);
         }
-        assertEquals(sink1, sink6);
+
+        // Wrap with ParallelPipeline.op.toArray
+        {
+            ArraySink<U> sink7 = new ArraySink<>(length);
+            Object[] array = par(data, op).toArray();
+            for (Object t : array)
+                sink7.accept((U) t);
+            assertEquals(refResult, sink7);
+        }
+
+        // Wrap with ParallelPipeline.op.into
+        {
+            ArraySink<U> sink8 = new ArraySink<>(length);
+            ArrayList list = (ArrayList) ((LinearPipeline) par(data, op)).sequential().into(new ArrayList());
+            for (Object u : list)
+                sink8.accept((U) u);
+            assertEquals(refResult, sink8);
+        }
 
         // @@@ Extend parallel testing to include
         //  - More ways to get a PSS: Arrays.asParallel, Arrays.asList().parallel(), new ArrayList().parallel()
-        //  - More ways to iterate the PSS: .sequential().into(), iterate result of op
+        //  - More ways to iterate the PSS: iterate result of op
+        // Extends testing to test whether computation happens in- or out-of-thread
     }
 
-    protected <T, U> void assertConsistentOpBehavior(T[] data, StatelessOp<T, U> op) {
+    protected <T, U> void assertConsistentOpBehavior(T[] data, IntermediateOp<T, U> op) {
         assertConsistentOpBehavior(data, op, null);
     }
 
-    protected <T, U> void assertConsistentOpBehavior(T[] data, EagerOp<T, U> op) {
-        SizedStreamable<T> seq = Arrays.asList(data);
-        ParallelStreamable<T> par = Arrays.parallel(data);
-        int length = data.length;
+    protected <T, U> void assertConsistentOpBehavior(T[] data, TerminalOp<T, U> op) {
+        assertConsistentOpBehavior(data, op, (u, v) -> u.equals(v));
+    }
 
+
+    protected <T, U> void assertConsistentOpBehavior(T[] data, ShortCircuitTerminalOp<T, U> op) {
+        // First pass -- wrap Iterator
+        U answer1 = op.evaluate(iterator(data));
+
+        // Second pass -- wrap with SequentialPipeline.op
+        Assert.assertEquals(answer1, seq(data, op));
+
+        // Fourth pass -- wrap with ParallelPipeline.op
+        Assert.assertEquals(answer1, par(data, op));
+    }
+
+    // @@@ Merge these two into a single method
+
+    protected <T, U> void assertConsistentOpBehavior(T[] data, TerminalOp<T, U> op, BiPredicate<U,U> equalator) {
         // First pass -- create a sink and evaluate, with no size advice
         StatefulSink<T, U> sink = op.sink();
         sink.begin(-1);
-        seq.forEach(sink);
+        forEach(data, sink);
         U answer1 = sink.end();
 
-        // Second pass -- create a sink and evaluate, with no size advice
+        // Second pass -- create a sink and evaluate, with size advice
         StatefulSink<T, U> sink2 = op.sink();
-        sink2.begin(seq.size());
-        seq.forEach(sink2);
+        sink2.begin(data.length);
+        forEach(data, sink2);
         U answer2 = sink2.end();
-        Assert.assertEquals(answer1, answer2);
+        Assert.assertTrue(equalator.eval(answer1, answer2));
 
         // Third pass -- wrap with SequentialPipeline.op
-        U answer3 = SequentialPipeline.wrap(seq).pipeline(op);
-        Assert.assertEquals(answer1, answer3);
+        U answer3 = seq(data, op);
+        Assert.assertTrue(equalator.eval(answer1, answer3));
 
         // Fourth pass -- wrap with ParallelPipeline.op
-        U answer4 = ParallelPipeline.wrap(par).pipeline(op);
-        Assert.assertEquals(answer1, answer4);
-    }
-
-    protected <T, U> void assertConsistentOpBehavior(T[] data, ShortCircuitEagerOp<T, U> op) {
-        SizedStreamable<T> seq = Arrays.asList(data);
-        ParallelStreamable<T> par = Arrays.parallel(data);
-        int length = data.length;
-
-        // First pass -- wrap Iterator
-        U answer1 = op.evaluate(seq.iterator());
-
-        // Second pass -- wrap with SequentialPipeline.op
-        U answer2 = SequentialPipeline.wrap(seq).pipeline(op);
-        Assert.assertEquals(answer1, answer2);
-
-        // Third pass -- wrap with ParallelPipeline.op
-        U answer3 = ParallelPipeline.wrap(par).pipeline(op);
-        Assert.assertEquals(answer1, answer3);
+        U answer4 = par(data, op);
+        Assert.assertTrue(equalator.eval(answer1, answer4));
     }
 
     private static <U> void assertEquals(ArraySink<U> sink1, ArraySink<U> sink2) {
-        Assert.assertEquals(sink1.size(), sink2.size());
-        Iterator<U> it1 = sink1.iterator();
-        Iterator<U> it2 = sink2.iterator();
-        while (it1.hasNext()) {
-            assertTrue(it2.hasNext());
-            Assert.assertEquals(it1.next(), it2.next());
+        try {
+            Assert.assertEquals(sink1.size(), sink2.size());
+            Iterator<U> it1 = sink1.iterator();
+            Iterator<U> it2 = sink2.iterator();
+            while (it1.hasNext()) {
+                assertTrue(it2.hasNext());
+                Assert.assertEquals(it1.next(), it2.next());
+            }
+            assertFalse(it2.hasNext());
         }
-        assertFalse(it2.hasNext());
-    }
-
-    private static final Integer[] to0 = new Integer[0];
-    private static final Integer[] to1 = new Integer[1];
-    private static final Integer[] to10 = new Integer[10];
-    private static final Integer[] to100 = new Integer[100];
-    private static final Integer[] to1000 = new Integer[1000];
-    private static final Integer[] reversed = new Integer[100];
-    private static final Integer[] ones = new Integer[100];
-    private static final Integer[] twice = new Integer[200];
-    private static final Integer[] pseudoRandom;
-
-    static {
-        Integer[][] arrays = {to0, to1, to10, to100, to1000};
-        for (Integer[] arr : arrays) {
-            for (int i = 0; i < arr.length; i++) {
-                arr[i] = i;
-            }
+        catch (AssertionError e) {
+            System.out.printf("Expected %s, found %s%n", sink1, sink2);
+            throw e;
         }
-        for (int i = 0; i < reversed.length; i++) {
-            reversed[i] = reversed.length - i;
-        }
-        for (int i = 0; i < ones.length; i++) {
-            ones[i] = 1;
-        }
-        System.arraycopy(to100, 0, twice, 0, to100.length);
-        System.arraycopy(to100, 0, twice, to100.length, to100.length);
-        pseudoRandom = new Integer[LambdaTestHelpers.LONG_STRING.length()];
-        for (int i = 0; i < LambdaTestHelpers.LONG_STRING.length(); i++) {
-            pseudoRandom[i] = (int)LambdaTestHelpers.LONG_STRING.charAt(i);
-        }
-    }
-
-    // Return an array of Integer[]
-    @DataProvider(name = "opArrays")
-    public static Object[][] makeOpArrays() {
-        return new Object[][]{
-            {"to zero", to0},
-            {"to one", to1},
-            {"to ten", to10},
-            {"to 100", to100},
-            {"to 1000", to1000},
-            {"100 ones", ones},
-            {"to 100 twice", twice},
-            {"reversed", reversed},
-            {"pseudorandom", pseudoRandom}};
     }
 
     private static class ArraySink<T> implements Sink<T>, Traversable<T>, Sized {
@@ -237,7 +260,7 @@
 
         @SuppressWarnings("unchecked")
         public ArraySink(int initialSize) {
-            array = (T[])new Object[initialSize];
+            array = (T[]) new Object[initialSize];
         }
 
         public ArraySink() {
@@ -253,7 +276,7 @@
 
         @Override
         public Iterator<T> iterator() {
-            return Arrays.iterator(array);
+            return Arrays.iterator(array, 0, offset);
         }
 
         @Override
@@ -271,5 +294,43 @@
         public int size() {
             return offset;
         }
+
+        @Override
+        public String toString() {
+            return String.format("ArraySink[%d](%s)", offset, Arrays.toString(array));
+        }
+    }
+
+    private static class DummyParallelOpHelper<T> implements ParallelOp.ParallelOpHelper<T,T> {
+        private final T[] data;
+
+        public DummyParallelOpHelper(T[] data) {
+            this.data = data;
+        }
+
+        @Override
+        public int suggestDepth() {
+            return ForkJoinUtils.suggestDepth(data.length);
+        }
+
+        @Override
+        public Spliterator spliterator() {
+            return Arrays.spliterator(data);
+        }
+
+        @Override
+        public Sink sink(Sink sink) {
+            return sink;
+        }
+
+        @Override
+        public Iterator iterator() {
+            return Arrays.iterator(data);
+        }
+
+        @Override
+        public Object invoke(ForkJoinTask task) {
+            return ForkJoinUtils.defaultFJPool().invoke(task);
+        }
     }
 }
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/ToArrayOpTest.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/ToArrayOpTest.java	Sat Aug 25 19:03:55 2012 -0400
@@ -24,6 +24,8 @@
  */
 package org.openjdk.tests.java.util.streams.ops;
 
+import org.openjdk.tests.java.util.LambdaTestHelpers;
+import org.openjdk.tests.java.util.streams.StreamTestDataProvider;
 import org.testng.annotations.Test;
 
 import java.util.streams.ops.ToArrayOp;
@@ -41,15 +43,14 @@
 public class ToArrayOpTest extends StreamOpTestCase {
     public void testRawIterator() {
         ToArrayOp<Integer> op = ToArrayOp.singleton();
-        assertCountSum((List<Integer>) (List) Arrays.asList(
-                iteratorToStatefulSink(countTo(0).iterator(), op.sink())), 0, 0);
-        assertCountSum((List<Integer>) (List) Arrays.asList(
-                iteratorToStatefulSink(countTo(10).iterator(), op.sink())), 10, 55);
+        Object[] result0 = iteratorToStatefulSink(countTo(0).iterator(), op.sink());
+        Object[] result10 = iteratorToStatefulSink(countTo(10).iterator(), op.sink());
+        assertCountSum(Arrays.asList(result0), 0, 0);
+        assertCountSum(Arrays.asList(result10), 10, 55);
     }
 
-    @Test(dataProvider = "opArrays")
+    @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
     public void testOps(String name, Integer[] data) {
-        ToArrayOp<Integer> op = ToArrayOp.singleton();
-        assertConsistentOpBehavior(data, op);
+        assertConsistentOpBehavior(data, ToArrayOp.<Integer>singleton(), Arrays::equals);
     }
 }
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/UniqOpTest.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/UniqOpTest.java	Sat Aug 25 19:03:55 2012 -0400
@@ -24,12 +24,10 @@
  */
 package org.openjdk.tests.java.util.streams.ops;
 
+import org.openjdk.tests.java.util.streams.StreamTestDataProvider;
 import org.testng.annotations.Test;
 
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.streams.SequentialPipeline;
 import java.util.streams.ops.UniqOp;
 
 import static org.openjdk.tests.java.util.LambdaTestHelpers.*;
@@ -48,10 +46,11 @@
         assertCountSum(UniqOp.<Integer>singleton().iterator(countTo(10).iterator()), 10, 55);
     }
 
-    @Test(dataProvider = "opArrays")
+    @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
     public void testOps(String name, Integer[] data) {
-        UniqOp<Integer> op = UniqOp.<Integer>singleton();
-        assertUnique(new SequentialPipeline<>(Arrays.asList(data), op).into(new ArrayList<Integer>()).iterator());
+        UniqOp<Integer> op = UniqOp.singleton();
+        ArrayList<Integer> result = seq(data, op).into(new ArrayList<Integer>());
+        assertUnique(result.iterator());
         assertConsistentOpBehavior(data, op, null);
     }
 }
--- a/test-ng/tests/org/openjdk/tests/javac/LambdaTranslationTest1.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/test-ng/tests/org/openjdk/tests/javac/LambdaTranslationTest1.java	Sat Aug 25 19:03:55 2012 -0400
@@ -116,7 +116,7 @@
         b7b.accept("implicit this:");
         assertResult("b9: implicit this: instance:flew");
 
-        Sink<Object> b10 = t -> {setResult(String.format("b10: new Thing: %s", (new LT1Thing(t)).str));};
+        Sink<Object> b10 = t -> {setResult(String.format("b10: new LT1Thing: %s", (new LT1Thing(t)).str));};
         b10.accept("thing");
         assertResult("b10: new LT1Thing: thing");
 
--- a/test-ng/tests/org/openjdk/tests/javac/MethodReferenceTestInstanceMethod.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/test-ng/tests/org/openjdk/tests/javac/MethodReferenceTestInstanceMethod.java	Sat Aug 25 19:03:55 2012 -0400
@@ -37,7 +37,8 @@
 public class MethodReferenceTestInstanceMethod {
     public Stream<String> generate() {
         return Arrays.asList("one", "two", "three", "four", "five", "six")
-            .filter(s -> s.length() > 3)
+            .stream()
+            .filter(s->s.length() > 3)
             .map(s -> s.toUpperCase());
     }
 
--- a/test-ng/tests/org/openjdk/tests/separate/SourceModel.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/test-ng/tests/org/openjdk/tests/separate/SourceModel.java	Sat Aug 25 19:03:55 2012 -0400
@@ -190,20 +190,20 @@
 
         protected void generateName(PrintWriter pw) {
             pw.print(this.name);
-            pw.print(this.parameters
-                .map(x -> x.toString())
-                .into(new StringJoiner(",", "<", ">").setEmptyOutput("")));
+            pw.print(this.parameters.stream()
+                             .map(x->x.toString())
+                             .into(new StringJoiner(",", "<", ">").setEmptyOutput("")));
             pw.print(" ");
         }
     
         protected void generateBody(PrintWriter pw, String superSpec) {
-            pw.print(this.supertypes
-                .map(x -> x.toString())
+            pw.print(this.supertypes.stream()
+                .map(x->x.toString())
                 .into(new StringJoiner(",", superSpec + " ", " ")
                           .setEmptyOutput("")));
             pw.println("{ ");
-            pw.print(this.methods
-                .map(x -> x.toString())
+            pw.print(this.methods.stream()
+                .map(x->x.toString())
                 .into(new StringJoiner(", ", "\n    ", "\n")
                           .setEmptyOutput("")));
             pw.println("}");
@@ -228,11 +228,13 @@
             if (superclass != null) {
                 dependencies.put(superclass.getName(), superclass);
             }
-            getSupertypes().map(x -> x.getType())
+            getSupertypes().stream()
+                           .map(x->x.getType())
                            .mapped(x -> x.getName())
-                           .swap().into(dependencies);
+                           .swap()
+                           .into(dependencies);
             // Do these last so that they override 
-            this.typeDependencies.mapped(x -> x.getName())
+            this.typeDependencies.stream().mapped(x->x.getName())
                                  .swap().into(dependencies);
             return dependencies.values();
         }
@@ -378,8 +380,8 @@
     
         public void generate(PrintWriter pw) {
             pw.print(supertype.getName());
-            pw.print(getArguments()
-                .map(x -> x.toString())
+            pw.print(getArguments().stream()
+                .map(x->x.toString())
                 .into(new StringJoiner(",", "<", ">").setEmptyOutput("")));
         }
     }
@@ -412,8 +414,8 @@
     
         protected void generateDecl(PrintWriter pw) {
             pw.printf("%s %s(", returnType, name);
-            pw.print(parameters
-                .map(x -> x.toString())
+            pw.print(parameters.stream()
+                .map(x->x.toString())
                 .into(new StringJoiner(",")));
             pw.print(")");
         }
--- a/test-ng/tests/org/openjdk/tests/separate/TestHarness.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/test-ng/tests/org/openjdk/tests/separate/TestHarness.java	Sat Aug 25 19:03:55 2012 -0400
@@ -25,18 +25,16 @@
 
 package org.openjdk.tests.separate;
 
+import org.testng.ITestResult;
+import org.testng.annotations.AfterMethod;
+
 import java.lang.reflect.InvocationTargetException;
-import java.util.*;
-import java.io.*;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.StringJoiner;
 
-import org.testng.ITestResult;
-import org.testng.annotations.Test;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.AfterSuite;
-
+import static org.openjdk.tests.separate.SourceModel.Class;
 import static org.openjdk.tests.separate.SourceModel.*;
-import static org.openjdk.tests.separate.SourceModel.Class;
 import static org.testng.Assert.*;
 
 public class TestHarness {
@@ -124,7 +122,7 @@
         Class stub = new Class(specimen.getName(), cm);
 
         String params =
-            Arrays.asList(args).into(new StringJoiner(", ")).toString();
+            Arrays.asList(args).stream().into(new StringJoiner(", ")).toString();
 
         StaticMethod sm = new StaticMethod(
             method.getReturnType(), method.getName(),
@@ -154,7 +152,7 @@
         Class cstub = new Class(specimen.getName());
 
         String params =
-            Arrays.asList(args).into(new StringJoiner(", ")).toString();
+            Arrays.asList(args).stream().into(new StringJoiner(", ")).toString();
 
         StaticMethod sm = StaticMethod.returns(
             String.format("((%s)(new %s())).%s(%s)", iface.toString(),
--- a/test-ng/tests/org/openjdk/tests/vm/FDSeparateCompilationTest.java	Mon Aug 20 16:10:37 2012 -0400
+++ b/test-ng/tests/org/openjdk/tests/vm/FDSeparateCompilationTest.java	Sat Aug 25 19:03:55 2012 -0400
@@ -25,10 +25,7 @@
 
 package org.openjdk.tests.vm;
 
-import java.lang.reflect.*;
 import java.util.*;
-import java.io.File;
-import java.io.IOException;
 
 import org.testng.ITestResult;
 import org.testng.annotations.Test;
@@ -62,8 +59,8 @@
         ArrayList<Object[]> allCases = new ArrayList<>();
 
         HierarchyGenerator hg = new HierarchyGenerator();
-        hg.getOK().map(x -> new Object[] { x }).into(allCases);
-        hg.getErr().map(x -> new Object[] { x }).into(allCases);
+        hg.getOK().stream().map(x->new Object[]{x}).into(allCases);
+        hg.getErr().stream().map(x -> new Object[] { x }).into(allCases);
         return allCases.toArray(new Object[0][]);
     }