changeset 6040:56ec158d6c32

Add Stream.concat, MapStream.concat
author psandoz
date Fri, 21 Sep 2012 12:39:38 -0400
parents 712690d71c75
children 6ac088dd69ad
files src/share/classes/java/util/Iterators.java src/share/classes/java/util/streams/MapPipeline.java src/share/classes/java/util/streams/MapStream.java src/share/classes/java/util/streams/Stream.java src/share/classes/java/util/streams/ValuePipeline.java src/share/classes/java/util/streams/ops/ConcatOp.java src/share/classes/java/util/streams/ops/TreeUtils.java test-ng/tests/org/openjdk/tests/java/util/LambdaTestHelpers.java test-ng/tests/org/openjdk/tests/java/util/streams/MapStreamTestDataProvider.java test-ng/tests/org/openjdk/tests/java/util/streams/ops/BiMatchOpTest.java test-ng/tests/org/openjdk/tests/java/util/streams/ops/ConcatOpTest.java test-ng/tests/org/openjdk/tests/java/util/streams/ops/ForEachOpTest.java test-ng/tests/org/openjdk/tests/java/util/streams/ops/StreamOpTestCase.java test-ng/tests/org/openjdk/tests/java/util/streams/ops/TeeOpTest.java
diffstat 14 files changed, 658 insertions(+), 150 deletions(-) [+]
line wrap: on
line diff
--- a/src/share/classes/java/util/Iterators.java	Fri Sep 21 11:58:54 2012 -0400
+++ b/src/share/classes/java/util/Iterators.java	Fri Sep 21 12:39:38 2012 -0400
@@ -25,6 +25,7 @@
 package java.util;
 
 import java.util.functions.*;
+import java.util.streams.Stream;
 
 /**
  * Utilities for Iterators.  All of these methods consume elements from the iterators passed to them!
@@ -119,4 +120,60 @@
     // @@@ toString
     // @@@ concat
     // @@@ zip
+
+    // @@@ Overloaded concat methods for various inputs
+
+    public static <T> Iterator<T> concat(final Iterator<? extends T> i1, final Iterator<? extends T> i2) {
+        Objects.requireNonNull(i1);
+        Objects.requireNonNull(i2);
+
+        return concat(Arrays.asList(i1, i2).iterator());
+    }
+
+    public static <T> Iterator<T> concat(final Iterator<? extends Iterator<? extends T>> iterators) {
+        Objects.requireNonNull(iterators);
+
+        if (!iterators.hasNext())
+            return Collections.emptyIterator();
+
+        return new Iterator<T>() {
+            private Iterator<? extends T> it = Objects.requireNonNull(iterators.next());
+            // Need to retain a reference to the last iterator used with next()
+            // so that remove() can use that reference for deferral and check for two or more calls,
+            // and because hasNext() may update "it" to the next iterator
+            private Iterator<? extends T> itForRemove = null;
+
+            @Override
+            public boolean hasNext() {
+                while (!it.hasNext()) {
+                    if (!iterators.hasNext()) {
+                        return false;
+                    }
+                    it = Objects.requireNonNull(iterators.next());
+                }
+
+                return true;
+            }
+
+            @Override
+            public T next() {
+                if (!hasNext()) {
+                    throw new NoSuchElementException();
+                }
+
+                itForRemove = it;
+                return it.next();
+            }
+
+            @Override
+            public void remove() {
+                if (itForRemove == null) {
+                    throw new IllegalStateException();
+                }
+
+                itForRemove.remove();
+                itForRemove = null;
+            }
+        };
+    }
 }
--- a/src/share/classes/java/util/streams/MapPipeline.java	Fri Sep 21 11:58:54 2012 -0400
+++ b/src/share/classes/java/util/streams/MapPipeline.java	Fri Sep 21 12:39:38 2012 -0400
@@ -92,6 +92,11 @@
     }
 
     @Override
+    public MapStream<K, V> concat(MapStream<K, V> other) {
+        return chainMap(ConcatOp.<K, V>makeMap(other));
+    }
+
+    @Override
     public Stream<K> keys() {
         return chainValue(new MapExtractKeysOp<K,V>());
     }
--- a/src/share/classes/java/util/streams/MapStream.java	Fri Sep 21 11:58:54 2012 -0400
+++ b/src/share/classes/java/util/streams/MapStream.java	Fri Sep 21 12:39:38 2012 -0400
@@ -146,6 +146,15 @@
      */
     MapStream<K, V> skip(int n);
 
+    /**
+     * Concatenate to the end of this stream.
+     * @@@ What about mergeWith?
+     *
+     * @param other the stream to concatenate.
+     * @return the concatenated stream.
+     */
+    MapStream<K, V> concat(MapStream<K, V> other);
+
     <A extends MapStream.Destination<K, V>> A into(A target);
 
     /**
--- a/src/share/classes/java/util/streams/Stream.java	Fri Sep 21 11:58:54 2012 -0400
+++ b/src/share/classes/java/util/streams/Stream.java	Fri Sep 21 12:39:38 2012 -0400
@@ -111,6 +111,14 @@
      */
     Stream<T> skip(int n);
 
+    /**
+     * Concatenate to the end of this stream.
+     *
+     * @param other the stream to concatenate.
+     * @return the concatenated stream.
+     */
+    Stream<T> concat(Stream<? extends T> other);
+
     <A extends Destination<? super T>> A into(A target);
 
     Object[] toArray();
--- a/src/share/classes/java/util/streams/ValuePipeline.java	Fri Sep 21 11:58:54 2012 -0400
+++ b/src/share/classes/java/util/streams/ValuePipeline.java	Fri Sep 21 12:39:38 2012 -0400
@@ -106,6 +106,11 @@
     }
 
     @Override
+    public Stream<U> concat(Stream<? extends U> other) {
+        return chainValue(ConcatOp.<U>make(other));
+    }
+
+    @Override
     public <A extends Destination<? super U>> A into(A target) {
         target.addAll(this);
         return target;
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/share/classes/java/util/streams/ops/ConcatOp.java	Fri Sep 21 12:39:38 2012 -0400
@@ -0,0 +1,185 @@
+/*
+ * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package java.util.streams.ops;
+
+import java.util.*;
+import java.util.streams.*;
+
+public abstract class ConcatOp<T> implements StatefulOp<T, T, Void> {
+
+    private final StreamShape shape;
+
+    private ConcatOp(StreamShape shape) {
+        this.shape = shape;
+    }
+
+    @Override
+    public int getStreamFlags(int upstreamFlags) {
+        return upstreamFlags & ~(Stream.FLAG_SIZED | Stream.FLAG_UNKNOWN_MASK_V1);
+    }
+
+    @Override
+    public abstract Iterator<T> wrapIterator(Iterator<T> source);
+
+    @Override
+    public abstract Sink<T, ?, ?> wrapSink(Sink sink);
+
+    @Override
+    public StreamShape inputShape() {
+        return shape;
+    }
+
+    @Override
+    public StreamShape outputShape() {
+        return shape;
+    }
+
+    @Override
+    public <Z> TreeUtils.Node<T> computeParallel(ParallelOpHelper<T, Z> helper) {
+        // Get all stuff from upstream
+        TreeUtils.Node<T> upStreamNode = TreeUtils.collect(helper, false, false);
+
+        // Get stuff from concatenation
+        TreeUtils.Node<T> concatStreamNode = computeParallelFromConcatenatingStream();
+
+        // Combine
+        return TreeUtils.node(upStreamNode, concatStreamNode);
+    }
+
+    protected abstract TreeUtils.Node<T> computeParallelFromConcatenatingStream();
+
+    // @@@ Support Streamable
+    //     This will ensure the operation can be used in detached pipelines
+    //     and there is a choice to obtain the serial or parallel stream
+    //     There are still cases where Stream is useful e.g. from I/O sources
+    // @@@ Requires flags are available on Streamable for analysis of the pipeline
+    // @@@ Might be possible for some consolidation if Streamable was generic to Stream/MapStream
+    public static <T> ConcatOp<T> make(final Stream<? extends T> stream) {
+        return new ConcatOp<T>(StreamShape.VALUE) {
+
+            @Override
+            public Iterator<T> wrapIterator(Iterator<T> source) {
+                Objects.requireNonNull(source);
+
+                return Iterators.concat(source, stream.iterator());
+            }
+
+            @Override
+            public Sink<T, ?, ?> wrapSink(Sink sink) {
+                Objects.requireNonNull(sink);
+
+                return new Sink.ChainedValue<T>(sink) {
+                    @Override
+                    public void accept(T t) {
+                        downstream.accept(t);
+                    }
+
+                    @Override
+                    public void end() {
+                        // Pull from the concatenating stream to ensure sequential access
+                        // Note that stream.forEach(downstream) will not, in the parallel case,
+                        // guarantee an order, and stream.sequential().forEach(downstream) will
+                        // result in buffering of the stream contents
+                        Iterator<? extends T> i = stream.iterator();
+                        while (i.hasNext()) {
+                            downstream.accept(i.next());
+                        }
+                        downstream.end();
+                    }
+                };
+            }
+
+            @Override
+            protected TreeUtils.Node<T> computeParallelFromConcatenatingStream() {
+                // Get stuff from concat stream
+                if (stream.isParallel() && stream instanceof AbstractPipeline) {
+                    // @@@ Yuk! the cast sucks, but it avoids uncessary wrapping
+                    // @@@ Also dangerous as CollectorOp can only be used when the stream is parallel
+                    return ((AbstractPipeline<T, T>) stream).pipeline(TreeUtils.CollectorOp.<T>singleton());
+                }
+                else {
+                    // @@@ Yuk! too much copying
+                    final StreamBuilder<T> sb = StreamBuilders.make();
+
+                    // @@@ stream.into(sb) fails because the StreamBuilder does not implement the full contract
+                    // of Collection
+
+                    sb.begin(-1);
+                    Iterator<? extends T> i = stream.iterator();
+                    while (i.hasNext())
+                        sb.accept(i.next());
+                    sb.end();
+
+                    return TreeUtils.node(sb);
+                }
+            }
+
+        };
+    }
+
+    public static <K, V> ConcatOp<Mapping<K, V>> makeMap(final MapStream<K, V> stream) {
+        return new ConcatOp<Mapping<K, V>>(StreamShape.KEY_VALUE) {
+
+            @Override
+            public Iterator<Mapping<K, V>> wrapIterator(Iterator<Mapping<K, V>> source) {
+                Objects.requireNonNull(source);
+
+                return MapIterator.IteratorAdapter.adapt(Iterators.concat(source, stream.iterator()));
+            }
+
+            @Override
+            public Sink<Mapping<K, V>, ?, ?> wrapSink(Sink sink) {
+                Objects.requireNonNull(sink);
+
+                return new Sink.ChainedMap<K, V>(sink) {
+                    @Override
+                    public void accept(K k, V v) {
+                        downstream.accept(k, v);
+                    }
+
+                    @Override
+                    public void end() {
+                        // Pull from the concatenating stream to ensure sequential access
+                        // Note that stream.forEach(downstream) will not, in the parallel case,
+                        // guarantee an order, and stream.sequential().forEach(downstream) will
+                        // result in buffering of the stream contents
+                        MapIterator<K, V> i = stream.iterator();
+                        while (i.hasNext()) {
+                            downstream.accept(i.nextKey(), i.curValue());
+                        }
+                        downstream.end();
+                    }
+                };
+            }
+
+            @Override
+            protected TreeUtils.Node<Mapping<K, V>> computeParallelFromConcatenatingStream() {
+                // @@@ Not currently possible to create TreeUtils.Node from a MapStream source
+                throw new UnsupportedOperationException();
+            }
+        };
+    }
+
+}
--- a/src/share/classes/java/util/streams/ops/TreeUtils.java	Fri Sep 21 11:58:54 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/TreeUtils.java	Fri Sep 21 12:39:38 2012 -0400
@@ -362,7 +362,7 @@
 
         @Override
         public Iterator<T> iterator() {
-            return concatenate(Arrays.stream(nodes).map(n -> n.iterator()).iterator());
+            return Iterators.concat(Arrays.stream(nodes).map(n -> n.iterator()).iterator());
         }
 
         @Override
@@ -481,33 +481,4 @@
             }
         }
     }
-
-    public static<T> Iterator<T> concatenate(final Iterator<Iterator<T>> iterators) {
-        if (!iterators.hasNext())
-            return Collections.emptyIterator();
-        return new Iterator<T>() {
-            private Iterator<? extends T> it = iterators.next();
-
-            @Override
-            public boolean hasNext() {
-                if (it.hasNext()) {
-                    return true;
-                } else if (!iterators.hasNext()) {
-                    return false;
-                } else {
-                    it = iterators.next();
-                    return hasNext();
-                }
-            }
-
-            @Override
-            public T next() {
-                if (hasNext()) {
-                    return it.next();
-                } else {
-                    throw new NoSuchElementException();
-                }
-            }
-        };
-    }
 }
--- a/test-ng/tests/org/openjdk/tests/java/util/LambdaTestHelpers.java	Fri Sep 21 11:58:54 2012 -0400
+++ b/test-ng/tests/org/openjdk/tests/java/util/LambdaTestHelpers.java	Fri Sep 21 12:39:38 2012 -0400
@@ -39,9 +39,13 @@
 public class LambdaTestHelpers {
     public static final String LONG_STRING = "When in the Course of human events it becomes necessary for one people to dissolve the political bands which have connected them with another and to assume among the powers of the earth, the separate and equal station to which the Laws of Nature and of Nature's God entitle them, a decent respect to the opinions of mankind requires that they should declare the causes which impel them to the separation.";
 
+    @SuppressWarnings("rawtypes")
     public static final Block bEmpty = x -> {  };
+    @SuppressWarnings("rawtypes")
     public static final BiBlock bBiEmpty = (x,y) -> { };
+    @SuppressWarnings("rawtypes")
     public static final Block bHashCode = x -> { Objects.hashCode(x); };
+    @SuppressWarnings("rawtypes")
     public static final BiBlock bBiHashCode = (x,y) -> { Objects.hash(x, y); };
     public static final Mapper<Integer, Integer> mZero = x -> 0;
     public static final Mapper<Integer, Integer> mId = x -> x;
@@ -215,6 +219,7 @@
     }
 
     @SafeVarargs
+    @SuppressWarnings("varargs")
     public static<T> void assertContents(Iterator<T> actual, T... expected) {
         assertContents(actual, Arrays.asList(expected).iterator());
     }
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/MapStreamTestDataProvider.java	Fri Sep 21 11:58:54 2012 -0400
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/MapStreamTestDataProvider.java	Fri Sep 21 12:39:38 2012 -0400
@@ -87,17 +87,22 @@
         List<Object[]> list_ii = new ArrayList<>();
         List<Object[]> list_is = new ArrayList<>();
         for (Class<?> type : TYPES) {
+            @SuppressWarnings("unchecked")
             String name = type.getName();
             for (int size : SIZES) {
                 String range = 0 == size ? "[empty]" : String.format("[1..%d]", size);
 
+                @SuppressWarnings("unchecked")
+                Class<? extends Map<Integer,Integer>> mapTypeInteger = (Class<? extends Map<Integer,Integer>>) type;
                 list_ii.add(new Object[]{
                             String.format("%s%s%s", name, "<Integer,Integer>", range),
-                            new StreamOpTestCase.MapTestData<>(makeIntegerIntegerMap((Class<? extends Map<Integer,Integer>>) type, size))
+                            new StreamOpTestCase.MapTestData<>(makeIntegerIntegerMap(mapTypeInteger, size))
                       });
+                @SuppressWarnings("unchecked")
+                Class<? extends Map<Integer,String>> mapTypeString = (Class<? extends Map<Integer,String>>) type;
                 list_is.add(new Object[]{
                             String.format("%s%s%s", name, "<Integer,String>", range),
-                            new StreamOpTestCase.MapTestData<>(makeIntegerStringMap((Class<? extends Map<Integer,String>>) type, size))
+                            new StreamOpTestCase.MapTestData<>(makeIntegerStringMap(mapTypeString, size))
                       });
             }
         }
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/BiMatchOpTest.java	Fri Sep 21 11:58:54 2012 -0400
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/BiMatchOpTest.java	Fri Sep 21 12:39:38 2012 -0400
@@ -89,6 +89,7 @@
     }
 
     @Test(dataProvider = "opMapsIS", dataProviderClass = MapStreamTestDataProvider.class)
+    @SuppressWarnings("unchecked")
     public void testOpsIS(String name, MapTestData<Integer, String> data) {
         for (BiPredicate<?, ?> p : ANY_PREDICATES) {
             for (MatchOp.MatchKind matchKind : MatchOp.MatchKind.values()) {
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/ConcatOpTest.java	Fri Sep 21 12:39:38 2012 -0400
@@ -0,0 +1,121 @@
+/*
+ * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package org.openjdk.tests.java.util.streams.ops;
+
+import org.openjdk.tests.java.util.LambdaTestHelpers;
+import org.openjdk.tests.java.util.streams.StreamTestDataProvider;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.*;
+import java.util.functions.Factory;
+import java.util.streams.Stream;
+import java.util.streams.Streams;
+import java.util.streams.ops.ConcatOp;
+
+import static org.openjdk.tests.java.util.LambdaTestHelpers.*;
+
+@Test
+public class ConcatOpTest extends StreamOpTestCase {
+
+    public void testRawIterator() {
+        assertCountSum(Iterators.concat(countTo(0).iterator(), countTo(0).iterator()), 0, 0);
+        assertCountSum(Iterators.concat(countTo(4).iterator(), countTo(0).iterator()), 4, 10);
+        assertCountSum(Iterators.concat(countTo(0).iterator(), countTo(4).iterator()), 4, 10);
+        assertCountSum(Iterators.concat(countTo(4).iterator(), countTo(4).iterator()), 8, 20);
+
+        List<Iterator<Integer>> lis = new ArrayList<>();
+        for (int i = 1; i < 8; i += 2) {
+            lis.add(range(i, i + 1).iterator());
+        }
+        assertCountSum(Iterators.concat(lis.iterator()), 8, 36);
+    }
+
+    public void testRawIteratorRemove() {
+        List<List<Integer>> ls = Streams.repeat(2, 2).
+                map(LambdaTestHelpers::countTo).into(new ArrayList<List<Integer>>());
+
+        List<Iterator<Integer>> lis = ls.stream().
+                map(i -> i.iterator()).into(new ArrayList<Iterator<Integer>>());
+
+        Iterator<Integer> i = Iterators.concat(lis.iterator());
+        while (i.hasNext()) {
+            i.next();
+            i.remove();
+        }
+
+        assertTrue(ls.stream().allMatch(l -> l.isEmpty()));
+    }
+
+    @Test(expectedExceptions = {IllegalStateException.class})
+    public void testRawIteratorRemoveTwice() {
+        Iterator<Integer> i = Iterators.concat(countTo(2).iterator(), countTo(2).iterator());
+        i.next();
+        i.remove();
+        i.remove();
+    }
+
+    @Test(expectedExceptions = {IllegalStateException.class})
+    public void testRawIteratorRemoveFirst() {
+        Iterator<Integer> i = Iterators.concat(countTo(2).iterator(), countTo(2).iterator());
+        i.remove();
+    }
+
+    public void testConcat() {
+        assertContents(Collections.<Integer>emptyList().stream().
+                concat(Collections.<Integer>emptyList().stream()).iterator(),
+                       Collections.<Integer>emptyList().iterator());
+
+        assertContents(countTo(10).stream().
+                concat(Collections.<Integer>emptyList().stream()).iterator(),
+                       countTo(10).stream().iterator());
+
+        assertContents(countTo(5).stream().
+                concat(range(6, 10).stream()).iterator(),
+                       countTo(10).stream().iterator());
+
+        assertContents(countTo(2).stream().
+                concat(range(3, 4).stream()).
+                concat(range(5, 6).stream()).
+                concat(range(7, 8).stream()).
+                concat(range(9, 10).stream()).iterator(),
+                       countTo(10).stream().iterator());
+    }
+
+    @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
+    public void testOpsSequential(String name, TestData<Integer> data) {
+        testUsingData(data).
+                excerciseSingleOpFactory(() -> ConcatOp.make(data.seqStream()));
+    }
+
+    @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
+    public void testOpsParallel(String name, TestData<Integer> data) {
+        testUsingData(data).
+                excerciseSingleOpFactory(() -> ConcatOp.make(data.parStream()));
+    }
+}
+
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/ForEachOpTest.java	Fri Sep 21 11:58:54 2012 -0400
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/ForEachOpTest.java	Fri Sep 21 12:39:38 2012 -0400
@@ -73,18 +73,21 @@
     }
 
     @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
+    @SuppressWarnings("unchecked")
     public void testOps(String name, TestData<Integer> data) {
        Object result = exerciseOps(data, ForEachOp.<Integer>make(bEmpty));
        assertEquals(result, null);
     }
 
     @Test(dataProvider = "opMapsII", dataProviderClass = MapStreamTestDataProvider.class)
+    @SuppressWarnings("unchecked")
     public void testOpsII(String name, MapTestData<Integer,Integer> data) {
         Object result = exerciseOps(data, ForEachOp.<Integer,Integer>make(bBiEmpty));
         assertEquals(result, null);
     }
 
     @Test(dataProvider = "opMapsIS", dataProviderClass = MapStreamTestDataProvider.class)
+    @SuppressWarnings("unchecked")
     public void testOpsIS(String name, MapTestData<Integer,String> data) {
         Object result = exerciseOps(data, ForEachOp.<Integer,String>make(bBiEmpty));
         assertEquals(result, null);
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/StreamOpTestCase.java	Fri Sep 21 11:58:54 2012 -0400
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/StreamOpTestCase.java	Fri Sep 21 12:39:38 2012 -0400
@@ -24,6 +24,7 @@
  */
 package org.openjdk.tests.java.util.streams.ops;
 
+import org.openjdk.tests.java.util.LambdaTestHelpers;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -35,6 +36,7 @@
 import java.util.functions.BiPredicate;
 import java.util.functions.BiBlock;
 import java.util.functions.Block;
+import java.util.functions.Factory;
 import java.util.streams.*;
 import java.util.streams.ops.IntermediateOp;
 import java.util.streams.ops.ParallelOp;
@@ -49,90 +51,198 @@
 @Test
 public abstract class StreamOpTestCase extends Assert {
 
+    protected static <T, U> StreamResult<U> exerciseOps(TestData<T> data, IntermediateOp... ops) {
+        return testUsingData(data).excerciseOps(ops);
+    }
+
+    protected static <T> IntermediateOpTestBuilder<T> testUsingData(TestData<T> data) {
+        return new IntermediateOpTestBuilder<>(data);
+    }
+
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    public static enum IntermediateOpTest {
+        // Create a sink and wrap it
+        DATA_FOR_EACH_TO_WRAPPED_SINK {
+            boolean isApplicable(IntermediateOp[] ops) {
+                return !isShortCircuit(ops);
+            }
+
+            <T> void run(TestData<T> data, Sink sink, IntermediateOp[] ops) {
+                Sink<T, ?, ?> wrapped = sink(sink, ops);
+                wrapped.begin(-1);
+                data.forEach(wrapped);
+                wrapped.end();
+            }
+        },
+
+        // Wrap with SequentialPipeline.op, and iterate in push mode
+        STREAM_FOR_EACH {
+            <T> void run(TestData<T> data, Sink sink, IntermediateOp[] ops) {
+                stream(data.seq(ops)).forEach(sink);
+            }
+        },
+
+        // Wrap as stream, and iterate in pull mode
+        STREAM_ITERATOR {
+            <T> void run(TestData<T> data, Sink sink, IntermediateOp[] ops) {
+                for (Iterator<?> seqIter = data.seq(ops).iterator(); seqIter.hasNext(); )
+                    sink.accept(seqIter.next());
+            }
+        },
+
+        // Wrap as stream, and iterate in mixed mode
+        STREAM_ITERATOR_FOR_EACH {
+            <T> void run(TestData<T> data, Sink sink, IntermediateOp[] ops) {
+                Stream<?> stream = stream(data.seq(ops));
+                Iterator<?> iter = stream.iterator();
+                if (iter.hasNext())
+                    sink.accept(iter.next());
+                stream.forEach(sink);
+            }
+        },
+
+        // Wrap as parallel stream + sequential
+        PAR_STREAM_SEQUENTIAL_FOR_EACH {
+            <T> void run(TestData<T> data, Sink sink, IntermediateOp[] ops) {
+                stream(data.par(ops)).sequential().forEach(sink);
+            }
+        },
+
+        // Wrap as parallel stream + toArray
+        PAR_STREAM_TO_ARRAY {
+            <T> void run(TestData<T> data, Sink sink, IntermediateOp[] ops) {
+                for (Object t : stream(data.par(ops)).toArray())
+                    sink.accept(t);
+            }
+        },
+
+        // Wrap as parallel stream + into
+        PAR_STREAM_SEQUENTIAL_INTO {
+            <T> void run(TestData<T> data, Sink sink, IntermediateOp[] ops) {
+                ArrayList list = stream(data.par(ops)).sequential().into(new ArrayList());
+                for (Object u : list)
+                    sink.accept(u);
+            }
+        },
+
+        // For stateful ops, validate Node contents with fake ParallelHelper
+        ONE_STATEFUL_OP_COMPUTE_PARALLEL {
+            boolean isApplicable(IntermediateOp[] ops) {
+                return ops.length == 1 && ops[0].isStateful();
+            }
+
+            <T> void run(TestData<T> data, Sink sink, IntermediateOp[] ops) {
+                StatefulOp<T, ?, ?> sop = (StatefulOp<T, ?, ?>) ops[0];
+                for (Object u : sop.computeParallel(new DummyParallelOpHelper<>(data)))
+                    sink.accept(u);
+            }
+        },
+
+        // More ways to iterate the PSS: iterate result of op
+        // Extends testing to test whether computation happens in- or out-of-thread
+        ;
+
+        boolean isApplicable(IntermediateOp[] ops) {
+            return true;
+        }
+
+        abstract <T> void run(TestData<T> data, Sink s, IntermediateOp[] ops);
+    }
+
+    public static class IntermediateOpTestBuilder<T> {
+        final TestData<T> data;
+        EnumSet<IntermediateOpTest> testSet = EnumSet.allOf(IntermediateOpTest.class);
+        // @@@ Add no-op block
+        @SuppressWarnings("unchecked")
+        Block<TestData<T>> before = LambdaTestHelpers.bEmpty;
+        @SuppressWarnings("unchecked")
+        Block<TestData<T>> after = LambdaTestHelpers.bEmpty;
+
+        private IntermediateOpTestBuilder(TestData<T> data) {
+            this.data = Objects.requireNonNull(data);
+        }
+
+        public IntermediateOpTestBuilder<T> before(Block<TestData<T>> before) {
+            this.before = Objects.requireNonNull(before);
+            return this;
+        }
+
+        public IntermediateOpTestBuilder<T> after(Block<TestData<T>> after) {
+            this.after = Objects.requireNonNull(after);
+            return this;
+        }
+
+        public IntermediateOpTestBuilder<T> without(IntermediateOpTest... tests) {
+            return without(Arrays.asList(tests));
+        }
+
+        public IntermediateOpTestBuilder<T> without(Collection<IntermediateOpTest> tests) {
+            testSet = EnumSet.complementOf(EnumSet.copyOf(tests));
+            return this;
+        }
+
+        public IntermediateOpTestBuilder<T> with(IntermediateOpTest... tests) {
+            return with(Arrays.asList(tests));
+        }
+
+        public IntermediateOpTestBuilder<T> with(Collection<IntermediateOpTest> tests) {
+            testSet = EnumSet.copyOf(tests);
+            return this;
+        }
+
+        public <U> StreamResult<U> excerciseOps(IntermediateOp... ops) {
+            Objects.requireNonNull(ops);
+            return excerciseMultipleOpsFactory(() -> ops);
+        }
+
+        @SuppressWarnings("rawtypes")
+        public <U> StreamResult<U> excerciseSingleOpFactory(Factory<IntermediateOp> fop) {
+            Objects.requireNonNull(fop);
+            return excerciseMultipleOpsFactory(() -> new IntermediateOp[] { fop.make() });
+        }
+
+        @SuppressWarnings("rawtypes")
+        public <U> StreamResult<U> excerciseMultipleOpsFactory(Factory<IntermediateOp[]> fops) {
+            Objects.requireNonNull(fops);
+            return exerciseOps(data, testSet, before, after, fops);
+        }
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    protected static <T, U> StreamResult<U> exerciseOps(TestData<T> data,
+                                                        EnumSet<IntermediateOpTest> testSet,
+                                                        Block<TestData<T>> before,
+                                                        Block<TestData<T>> after,
+                                                        Factory<IntermediateOp[]> fops) {
+        StreamResult<U> refResult = new StreamResult<>(data.size());
+
+        // First pass -- grab an iterator and wrap it, and call that the reference result
+        before.apply(data);
+        Iterator<U> it = (Iterator<U>) data.iterator(fops.make());
+        while (it.hasNext())
+            refResult.accept(it.next());
+        after.apply(data);
+
+        for (IntermediateOpTest test : testSet) {
+            IntermediateOp[] ops = fops.make();
+            if (test.isApplicable(ops)) {
+                before.apply(data);
+                assertMatches(refResult, sink -> test.run(data, sink, ops));
+                after.apply(data);
+            }
+        }
+
+        return refResult;
+    }
+
+    @SuppressWarnings("rawtypes")
     protected static<T> void assertMatches(StreamResult<T> refResult, Block<Sink> block) {
         StreamResult<T> newResult = new StreamResult<>();
         block.apply(newResult);
         assertEquals(refResult, newResult);
     }
 
-    @SuppressWarnings({ "raw", "unchecked" })
-    protected static <T, U> StreamResult<U> exerciseOps(TestData<T> data,
-                                                        IntermediateOp<T, U> op) {
-        StreamResult<U> refResult = exerciseOps(data, new IntermediateOp[] { op });
-
-        // For stateful ops, validate Node contents with fake ParallelHelper
-        if (op.isStateful()) {
-            assertMatches(refResult, sink -> {
-                StatefulOp<T, U, ?> sop = (StatefulOp<T, U, ?>) op;
-                for (U u : sop.computeParallel(new DummyParallelOpHelper<>(data)))
-                    sink.accept(u);
-            });
-        }
-
-        return refResult;
-    }
-
-    @SuppressWarnings({ "raw", "unchecked" })
-    protected static <T, U> StreamResult<U> exerciseOps(TestData<T> data,
-                                                        IntermediateOp... ops) {
-        StreamResult<U> refResult = new StreamResult<>(data.size());
-
-        // First pass -- grab an iterator and wrap it, and call that the reference result
-        Iterator<U> it = (Iterator<U>) data.iterator(ops);
-        while (it.hasNext())
-            refResult.accept(it.next());
-
-        // Second pass -- create a sink and wrap it
-        // Only test push with sink if there are no short-circuit ops
-        if (!isShortCircuit(ops)) {
-            assertMatches(refResult, sink -> {
-                Sink<T, ?, ?> wrapped = sink(sink, ops);
-                wrapped.begin(-1);
-                data.forEach(wrapped);
-                wrapped.end();
-            });
-        }
-
-        // Third pass -- wrap with SequentialPipeline.op, and iterate in push mode
-        assertMatches(refResult, sink -> { stream(data.seq(ops)).forEach(sink); });
-
-        // Wrap as stream, and iterate in pull mode
-        assertMatches(refResult, sink -> {
-            for (Iterator<?> seqIter = data.seq(ops).iterator(); seqIter.hasNext(); )
-                sink.accept(seqIter.next());
-        });
-
-        // Wrap as stream, and iterate in mixed mode
-        assertMatches(refResult, sink -> {
-            Stream<?> stream = stream(data.seq(ops));
-            Iterator<?> iter = stream.iterator();
-            if (iter.hasNext())
-                sink.accept(iter.next());
-            stream.forEach(sink);
-        });
-
-        // Wrap as parallel stream + sequential
-        assertMatches(refResult, sink -> { stream(data.par(ops)).sequential().forEach(sink); });
-
-        // Wrap as parallel stream + toArray
-        assertMatches(refResult, sink -> {
-            for (Object t : stream(data.par(ops)).toArray())
-                sink.accept(t);
-        });
-
-        // Wrap as parallel stream + into
-        assertMatches(refResult, sink -> {
-            ArrayList list = stream(data.par(ops)).sequential().into(new ArrayList());
-            for (Object u : list)
-                sink.accept(u);
-        });
-
-        // More ways to iterate the PSS: iterate result of op
-        // Extends testing to test whether computation happens in- or out-of-thread
-
-        return refResult;
-    }
-
+    @SuppressWarnings("rawtypes")
     private static boolean isShortCircuit(IntermediateOp... ops) {
         for (IntermediateOp op : ops) {
             if (op.isShortCircuit()) {
@@ -142,15 +252,17 @@
         return false;
     }
 
+    @SuppressWarnings("rawtypes")
     protected <T, U> U exerciseOps(TestData<T> data, TerminalOp<T, U> terminal) {
         return exerciseOps(data, terminal, new IntermediateOp[0]);
     }
 
+    @SuppressWarnings("rawtypes")
     protected <T, U> U exerciseOps(TestData<T> data, TerminalOp<T, U> terminal, IntermediateOp... ops) {
         return exerciseOps(data, (u, v) -> Objects.equals(u,v), terminal, ops);
     }
 
-    @SuppressWarnings({ "raw", "unchecked" })
+    @SuppressWarnings({ "rawtypes", "unchecked" })
     protected static <T, U> U exerciseOps(TestData<T> data,
                                           BiPredicate<U, U> equalator,
                                           TerminalOp<T, U> terminalOp,
@@ -191,10 +303,12 @@
         return answer;
     }
 
+    @SuppressWarnings("rawtypes")
     protected <K, V, U> U exerciseOps(MapTestData<K,V> data, TerminalOp<Mapping<K,V>, U> terminal) {
         return exerciseOps(data, terminal, new IntermediateOp[0]);
     }
 
+    @SuppressWarnings("rawtypes")
     protected <K, V, U> U exerciseOps(MapTestData<K,V> data, TerminalOp<Mapping<K,V>, U> terminal, IntermediateOp... ops) {
         // @@@ Compiler bug if lambda literal used. Fine as a local.
         BiPredicate<U,U> equalator = (u, v) -> Objects.equals(u,v);
@@ -332,12 +446,13 @@
         }
 
         @Override
-        @SuppressWarnings({ "raw", "unchecked" })
+        @SuppressWarnings({ "rawtypes", "unchecked" })
         public Sink<T, ?, ?> wrapSink(Sink sink) {
             return sink;
         }
 
         @Override
+        @SuppressWarnings("rawtypes")
         public Iterator wrapIterator(Iterator it) {
             return it;
         }
@@ -353,7 +468,7 @@
         }
     }
 
-    @SuppressWarnings({ "raw", "unchecked" })
+    @SuppressWarnings({ "rawtypes", "unchecked" })
     private static<T> AbstractPipeline<?, T> chain(AbstractPipeline pipe, IntermediateOp<?, T> op) {
         switch (op.outputShape()) {
             case VALUE: return new ValuePipeline(pipe, op);
@@ -362,7 +477,7 @@
         }
     }
 
-    @SuppressWarnings({ "raw", "unchecked" })
+    @SuppressWarnings({ "rawtypes", "unchecked" })
     private static<U> U chain(AbstractPipeline pipe, TerminalOp<?, U> op) {
         switch (pipe.getShape()) {
             case VALUE: return (U) ((ValuePipeline) pipe).pipeline(op);
@@ -371,30 +486,31 @@
         }
     }
 
-    @SuppressWarnings({ "raw", "unchecked" })
+    @SuppressWarnings({ "rawtypes", "unchecked" })
     private static AbstractPipeline<?, ?> chain(AbstractPipeline pipe, IntermediateOp... ops) {
         for (IntermediateOp op : ops)
             pipe = chain(pipe, op);
         return pipe;
     }
 
-    @SuppressWarnings({ "raw", "unchecked" })
+    @SuppressWarnings({ "rawtypes", "unchecked" })
     private static<U> U chain(AbstractPipeline pipe, TerminalOp<?, U> terminal, IntermediateOp... ops) {
         for (IntermediateOp op : ops)
             pipe = chain(pipe, op);
         return chain(pipe, terminal);
     }
 
-    @SuppressWarnings({ "raw", "unchecked" })
+    @SuppressWarnings({ "rawtypes", "unchecked" })
     protected static<U> Stream<U> stream(AbstractPipeline<?, U> pipe) {
         return (Stream<U>) pipe;
     }
 
-    @SuppressWarnings({ "raw", "unchecked" })
+    @SuppressWarnings({ "rawtypes", "unchecked" })
     protected static<K,V> MapStream<K,V> mapStream(AbstractPipeline<?, Mapping<K,V>> pipe) {
         return (MapStream<K,V>) pipe;
     }
 
+    @SuppressWarnings("rawtypes")
     protected static Sink sink(Sink sink, IntermediateOp[] ops) {
         for (int i=ops.length-1; i >= 0; i--)
             sink = ops[i].wrapSink(sink);
@@ -430,7 +546,7 @@
             return target;
         }
 
-        @SuppressWarnings({ "raw", "unchecked" })
+        @SuppressWarnings({ "rawtypes", "unchecked" })
         Iterator<?> iterator(IntermediateOp... ops) default {
             Iterator<T> iterator = iterator();
             for (IntermediateOp op : ops)
@@ -438,32 +554,32 @@
             return iterator;
         }
 
-        @SuppressWarnings({ "raw", "unchecked" })
+        @SuppressWarnings({ "rawtypes", "unchecked" })
         AbstractPipeline<?, ?> seq(IntermediateOp... ops) default {
             return chain(seq(), ops);
         }
 
-        @SuppressWarnings({ "raw", "unchecked" })
+        @SuppressWarnings({ "rawtypes", "unchecked" })
         AbstractPipeline<?, ?> par(IntermediateOp... ops) default {
             return chain(par(), ops);
         }
 
-        @SuppressWarnings({ "raw", "unchecked" })
+        @SuppressWarnings({ "rawtypes", "unchecked" })
         <U> U seq(TerminalOp<T, U> op) default {
             return chain(seq(), op);
         }
 
-        @SuppressWarnings({ "raw", "unchecked" })
+        @SuppressWarnings({ "rawtypes", "unchecked" })
         <U> U par(TerminalOp<T, U> op) default {
             return chain(par(), op);
         }
 
-        @SuppressWarnings({ "raw", "unchecked" })
+        @SuppressWarnings({ "rawtypes", "unchecked" })
         <U> U seq(TerminalOp<T, U> terminal, IntermediateOp... ops) default {
             return chain(seq(), terminal, ops);
         }
 
-        @SuppressWarnings({ "raw", "unchecked" })
+        @SuppressWarnings({ "rawtypes", "unchecked" })
         <U> U par(TerminalOp<T, U> terminal, IntermediateOp... ops) default {
             return chain(par(), terminal, ops);
         }
@@ -477,13 +593,13 @@
         }
 
         @Override
-        @SuppressWarnings({ "raw", "unchecked" })
+        @SuppressWarnings({ "rawtypes", "unchecked" })
         public AbstractPipeline<?, T> seq() {
             return (AbstractPipeline<?, T>) Arrays.stream(array);
         }
 
         @Override
-        @SuppressWarnings({ "raw", "unchecked" })
+        @SuppressWarnings({ "rawtypes", "unchecked" })
         public AbstractPipeline<?, T> par() {
             return (AbstractPipeline<?, T>) Arrays.parallel(array);
         }
@@ -523,13 +639,13 @@
         }
 
         @Override
-        @SuppressWarnings({ "raw", "unchecked" })
+        @SuppressWarnings({ "rawtypes", "unchecked" })
         public AbstractPipeline<?, T> seq() {
             return (AbstractPipeline<?, T>) collection.stream();
         }
 
         @Override
-        @SuppressWarnings({ "raw", "unchecked" })
+        @SuppressWarnings({ "rawtypes", "unchecked" })
         public AbstractPipeline<?, T> par() {
             return (AbstractPipeline<?, T>) collection.parallel();
         }
@@ -540,7 +656,7 @@
         }
 
         @Override
-        @SuppressWarnings({ "raw", "unchecked" })
+        @SuppressWarnings({ "rawtypes", "unchecked" })
         public Spliterator<T> spliterator() {
             // @@@ FIXME!
             return Arrays.spliterator((T[]) collection.toArray());
@@ -570,13 +686,13 @@
         }
 
         @Override
-        @SuppressWarnings({ "raw", "unchecked" })
+        @SuppressWarnings({ "rawtypes", "unchecked" })
         public AbstractPipeline<?, Mapping<K,V>> seq() {
             return (AbstractPipeline<?, Mapping<K,V>>) map.stream();
         }
 
         @Override
-        @SuppressWarnings({ "raw", "unchecked" })
+        @SuppressWarnings({ "rawtypes", "unchecked" })
         public AbstractPipeline<?, Mapping<K,V>> par() {
             return (AbstractPipeline<?, Mapping<K,V>>) map.parallel();
         }
@@ -587,7 +703,7 @@
         }
 
         @Override
-        @SuppressWarnings({ "raw", "unchecked" })
+        @SuppressWarnings({ "rawtypes", "unchecked" })
         public Spliterator<Mapping<K,V>> spliterator() {
             // @@@ FIXME!
             return Arrays.spliterator((Mapping<K,V>[]) map.entrySet().toArray());
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/TeeOpTest.java	Fri Sep 21 11:58:54 2012 -0400
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/TeeOpTest.java	Fri Sep 21 12:39:38 2012 -0400
@@ -24,28 +24,27 @@
  */
 package org.openjdk.tests.java.util.streams.ops;
 
-import java.util.Arrays;
-import java.util.Optional;
-import java.util.streams.TerminalSink;
+import org.openjdk.tests.java.util.streams.StreamTestDataProvider;
 import org.testng.annotations.Test;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.functions.Block;
+import java.util.streams.TerminalSink;
+import java.util.streams.ops.MapOp;
 import java.util.streams.ops.TeeOp;
 import java.util.streams.ops.ToArrayOp;
-import java.util.streams.ops.SeedlessFoldOp;
-import java.util.streams.ops.MapOp;
-
-import org.openjdk.tests.java.util.streams.StreamTestDataProvider;
-
 
 import static org.openjdk.tests.java.util.LambdaTestHelpers.*;
 
 /**
  * TeeOpTest
- *
  */
 public class TeeOpTest extends StreamOpTestCase {
     public void testRawIterator() {
-        TerminalSink<Integer,Object[]> copy = ToArrayOp.<Integer>singleton().sink();
+        TerminalSink<Integer, Object[]> copy = ToArrayOp.<Integer>singleton().sink();
         copy.begin(-1);
         assertCountSum(TeeOp.iterator(countTo(0).iterator(), copy), 0, 0);
         copy.end();
@@ -63,7 +62,7 @@
     }
 
     public void testTee() {
-        TerminalSink<Integer,Object[]> copy = ToArrayOp.<Integer>singleton().sink();
+        TerminalSink<Integer, Object[]> copy = ToArrayOp.<Integer>singleton().sink();
         copy.begin(-1);
         assertCountSum(countTo(0).stream().tee(copy), 0, 0);
         copy.end();
@@ -81,12 +80,30 @@
     }
 
     @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
-    public void testOps(String name, TestData<Integer> data) {
-        TerminalSink<Integer,Optional<Integer>> fold = new SeedlessFoldOp<>(rPlus).sink();
-        fold.begin(-1);
-        StreamResult<Integer> result = exerciseOps(data, new TeeOp<>(fold));
-        assertContentsUnordered(result, data);
+    public void testOps(String name, final TestData<Integer> data) {
+        class RecordingBlock implements Block<Integer> {
+            List<Integer> list;
 
-        fold.end();
+            void before(TestData<Integer> td) {
+                // Tee block can be called concurrently
+                list = Collections.<Integer>synchronizedList(new ArrayList<>());
+            }
+
+            public void apply(Integer t) {
+                list.add(t);
+            }
+
+            void after(TestData<Integer> td) {
+                // No guarantees in parallel tests that calls to tee block will
+                // be in the encounter order, if defined, of the data
+                assertContentsUnordered(list, data.into(new ArrayList<Integer>()));
+            }
+        }
+
+        final RecordingBlock b = new RecordingBlock();
+        testUsingData(data).
+                before(b::before).
+                after(b::after).
+                excerciseOps(new TeeOp<>(b));
     }
 }