changeset 6347:38236c556c72

Provide flag information to ops through wrapIterator/wrapSink, so they can use it to optimize the pipeline formation. Contributed-By: paul.sandoz@oracle.com
author briangoetz
date Wed, 24 Oct 2012 15:39:53 -0400
parents c01df4ef1790
children e40752a8afe3
files src/share/classes/java/util/streams/AbstractPipeline.java src/share/classes/java/util/streams/ops/ConcatOp.java src/share/classes/java/util/streams/ops/CumulateOp.java src/share/classes/java/util/streams/ops/FilterOp.java src/share/classes/java/util/streams/ops/FlagDeclaringOp.java src/share/classes/java/util/streams/ops/FlatMapOp.java src/share/classes/java/util/streams/ops/IntermediateOp.java src/share/classes/java/util/streams/ops/LimitOp.java src/share/classes/java/util/streams/ops/MapOp.java src/share/classes/java/util/streams/ops/SkipOp.java src/share/classes/java/util/streams/ops/SortedOp.java src/share/classes/java/util/streams/ops/StatefulOp.java src/share/classes/java/util/streams/ops/TeeOp.java src/share/classes/java/util/streams/ops/UniqOp.java test-ng/tests/org/openjdk/tests/java/util/streams/ops/FlagOpTest.java test-ng/tests/org/openjdk/tests/java/util/streams/ops/StreamOpTestCase.java test-ng/tests/org/openjdk/tests/java/util/streams/ops/UniqOpTest.java
diffstat 17 files changed, 258 insertions(+), 132 deletions(-) [+]
line wrap: on
line diff
--- a/src/share/classes/java/util/streams/AbstractPipeline.java	Wed Oct 24 14:41:29 2012 -0400
+++ b/src/share/classes/java/util/streams/AbstractPipeline.java	Wed Oct 24 15:39:53 2012 -0400
@@ -42,6 +42,7 @@
     protected final StreamAccessor<?> source;
     protected final IntermediateOp<E_IN, E_OUT> op;
     protected final int depth;
+    protected final int combinedOpsFlags;
 
     /**
      * If non-{@code null} then we are in serial iteration mode.
@@ -58,6 +59,7 @@
         this.op = null;
         this.upstream = null;
         this.depth = 0;
+        this.combinedOpsFlags = StreamOpFlags.INITIAL_OPS_VALUE;
     }
 
     /**
@@ -71,6 +73,7 @@
         this.op = Objects.requireNonNull(op);
         this.source = upstream.source;
         this.depth = upstream.depth + 1;
+        this.combinedOpsFlags = StreamOpFlags.combineOpFlags(op.getOpFlags(), upstream.combinedOpsFlags);
         assert upstream.getShape() == op.inputShape();
         assert (upstream.depth == 0) ^ (upstream.op != null);
     }
@@ -80,7 +83,7 @@
     }
 
     protected<R> R evaluate(TerminalOp<E_OUT, R> terminal) {
-        // @@@ If the source size estimate is small, don't bother going parallel
+        // @@@ NYI If the source size estimate is small, don't bother going parallel
         if (source.isParallel()) {
             return evaluateParallel(terminal);
         }
@@ -94,17 +97,18 @@
             return evaluateSerial(terminal);
         }
 
-        final IntermediateOp[] ops = ops();
-        int fromOp = 0;
-        int upToOp = 0;
+        final AbstractPipeline[] pipes = pipes();
+        // Start from 2nd element since 1st is the head containing the source
+        int fromOp = 1;
+        int upToOp = 1;
         StreamAccessor<?> accessor = source;
         while (true) {
-            while (upToOp < ops.length && !ops[upToOp].isStateful())
+            while (upToOp < pipes.length && !pipes[upToOp].op.isStateful())
                 upToOp++;
 
-            if (upToOp < ops.length) {
-                StatefulOp op = (StatefulOp) ops[upToOp];
-                Node<?> intermediateResult = evaluateParallel(accessor, ops, fromOp, upToOp, op);
+            if (upToOp < pipes.length) {
+                StatefulOp op = (StatefulOp) pipes[upToOp].op;
+                Node<?> intermediateResult = evaluateParallel(accessor, pipes, fromOp, upToOp, op);
 
                 // @@@ Inherit other flags from pipeline e.g. the intermediate result may be sorted and/or distinct,
                 //     and is ordered
@@ -113,19 +117,21 @@
                 fromOp = ++upToOp;
             }
             else {
-                return evaluateParallel(accessor, ops, fromOp, upToOp, terminal);
+                return evaluateParallel(accessor, pipes, fromOp, upToOp, terminal);
             }
         }
     }
 
-    private <R> Node<R> evaluateParallel(StreamAccessor source, IntermediateOp[] ops, int from, int to,
-                                                   StatefulOp<?, R> terminal) {
-        return (Node<R>) terminal.evaluateParallel(new ParallelImplPipelineHelper(source, ops, from, to));
+    private <R> Node<R> evaluateParallel(StreamAccessor source,
+                                         AbstractPipeline[] pipes, int from, int upTo,
+                                         StatefulOp<?, R> terminal) {
+        return (Node<R>) terminal.evaluateParallel(new ParallelImplPipelineHelper(source, pipes, from, upTo));
     }
 
-    private <R> R evaluateParallel(StreamAccessor<?> source, IntermediateOp[] ops, int from, int to,
+    private <R> R evaluateParallel(StreamAccessor<?> source,
+                                   AbstractPipeline[] pipes, int from, int upTo,
                                    TerminalOp<E_OUT, R> terminal) {
-        return terminal.evaluateParallel(new ParallelImplPipelineHelper<>(source, ops, from, to));
+        return terminal.evaluateParallel(new ParallelImplPipelineHelper<>(source, pipes, from, upTo));
     }
 
     protected <R> R evaluateSerial(TerminalOp<E_OUT, R> terminal) {
@@ -134,41 +140,36 @@
 
     static abstract class AbstractPipelineHelper<P_IN, P_OUT> implements PipelineHelper<P_IN, P_OUT> {
         protected final StreamAccessor<P_IN> source;
-        final IntermediateOp[] ops;
+        final AbstractPipeline[] pipes;
         final int from;
-        final int to;
-        final int opsFlags;
-        final int flags;
+        final int upTo;
+        final int sourceAndOpsFlags;
 
-        AbstractPipelineHelper(StreamAccessor<P_IN> source, IntermediateOp[] ops) {
-            this(source, ops, 0, ops.length);
+        AbstractPipelineHelper(StreamAccessor<P_IN> source, AbstractPipeline[] pipes) {
+            // Start from 2nd element since 1st is the head containing the source
+            this(source, pipes, 1, pipes.length);
         }
 
-        AbstractPipelineHelper(StreamAccessor<P_IN> source, IntermediateOp[] ops, int from, int to) {
-            int opsFlags = StreamOpFlags.INITIAL_OPS_VALUE;
-            for (int i = from; i < to; i++) {
-                int opFlags = ops[i].getOpFlags();
-                opsFlags = StreamOpFlags.combineOpFlags(opFlags, opsFlags);
-            }
-
+        AbstractPipelineHelper(StreamAccessor<P_IN> source, AbstractPipeline[] pipes, int from, int upTo) {
             this.source = source;
-            this.ops = ops;
+            this.pipes = pipes;
             this.from = from;
-            this.to = to;
-            this.opsFlags = opsFlags;
-            this.flags = StreamOpFlags.combineStreamFlags(source.getStreamFlags(), opsFlags);
+            this.upTo = upTo;
+            this.sourceAndOpsFlags = StreamOpFlags.combineStreamFlags(
+                    source.getStreamFlags(),
+                    pipes[upTo - 1].combinedOpsFlags);
         }
 
         protected boolean isOutputSizeKnown() {
-            return StreamOpFlags.SIZED.isKnown(flags);
+            return StreamOpFlags.SIZED.isKnown(sourceAndOpsFlags);
         }
 
         protected boolean isShortCircuit() {
-            return StreamOpFlags.SHORT_CIRCUIT.isKnown(flags);
+            return StreamOpFlags.SHORT_CIRCUIT.isKnown(sourceAndOpsFlags);
         }
 
         protected boolean hasZeroDepth() {
-            return to == from;
+            return upTo == from;
         }
 
         @Override
@@ -182,7 +183,7 @@
 
         @Override
         public int getStreamFlags() {
-            return flags;
+            return sourceAndOpsFlags;
         }
 
         @Override
@@ -192,15 +193,17 @@
 
         @Override
         public boolean isOutputInfinitelySized() {
-            return StreamOpFlags.INFINITELY_SIZED.isKnown(flags);
+            return StreamOpFlags.INFINITELY_SIZED.isKnown(sourceAndOpsFlags);
         }
 
         @Override
         public Sink<P_IN> wrapSink(Sink sink) {
             Objects.requireNonNull(sink);
 
-            for (int i = to - 1; i >= from; i--) {
-                sink = ops[i].wrapSink(sink);
+            for (int i = upTo - 1; i >= from; i--) {
+                sink = pipes[i].op.wrapSink(
+                        StreamOpFlags.combineStreamFlags(source.getStreamFlags(), pipes[i - 1].combinedOpsFlags),
+                        sink);
             }
             return sink;
         }
@@ -209,8 +212,10 @@
         public Iterator<P_OUT> wrapIterator(Iterator it) {
             Objects.requireNonNull(it);
 
-            for (int i = from; i < to; i++) {
-                it = ops[i].wrapIterator(it);
+            for (int i = from; i < upTo; i++) {
+                it = pipes[i].op.wrapIterator(
+                        StreamOpFlags.combineStreamFlags(source.getStreamFlags(), pipes[i - 1].combinedOpsFlags),
+                        it);
             }
             return it;
         }
@@ -219,7 +224,7 @@
     class SequentialImplPipelineHelper<P_IN> extends AbstractPipelineHelper<P_IN, E_OUT> {
 
         <R> SequentialImplPipelineHelper(StreamAccessor<P_IN> source, TerminalOp<E_OUT, R> terminal) {
-            super(source, ops());
+            super(source, pipes());
 
             // @@@ Other potential IllegalStateException
             // @@@ If an infinite stream is input to a stateful intermediate op that is required to operate on all
@@ -273,8 +278,8 @@
     class ParallelImplPipelineHelper<P_IN> extends AbstractPipelineHelper<P_IN, E_OUT> implements ParallelPipelineHelper<P_IN, E_OUT> {
         final long targetSize;
 
-        ParallelImplPipelineHelper(StreamAccessor<P_IN> source, IntermediateOp[] ops, int from, int to) {
-            super(source, ops, from, to);
+        ParallelImplPipelineHelper(StreamAccessor<P_IN> source, AbstractPipeline[] pipes, int from, int to) {
+            super(source, pipes, from, to);
             this.targetSize = calculateTargetSize();
         }
 
@@ -285,7 +290,6 @@
                    : 2; // @@@ SWAG
         }
 
-
         //
 
         @Override
@@ -321,19 +325,25 @@
 
     }
 
-    private IntermediateOp[] ops() {
-        IntermediateOp[] ops = new IntermediateOp[depth];
+    // Return the array of pipes, including the head that represents the source
+    private AbstractPipeline[] pipes() {
+        AbstractPipeline[] pipes = new AbstractPipeline[depth + 1];
         AbstractPipeline p = this;
-        for (int i = depth - 1; i >= 0; i--) {
-            ops[i] = p.op;
+        for (int i = depth; i >= 0; i--) {
+            pipes[i] = p;
             p = p.upstream;
         }
-        return ops;
+        return pipes;
     }
 
+    @SuppressWarnings("unchecked")
     public Iterator<E_OUT> iterator() {
         if (iterator == null) {
-            iterator = (Iterator) ((op == null) ? source.iterator() : op.wrapIterator(upstream.iterator()));
+            iterator = (Iterator) ((op == null)
+                                   ? source.iterator()
+                                   : op.wrapIterator(StreamOpFlags.combineStreamFlags(source.getStreamFlags(),
+                                                                                      upstream.combinedOpsFlags),
+                                                     upstream.iterator()));
         }
         return iterator;
     }
--- a/src/share/classes/java/util/streams/ops/ConcatOp.java	Wed Oct 24 14:41:29 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/ConcatOp.java	Wed Oct 24 15:39:53 2012 -0400
@@ -49,14 +49,14 @@
     }
 
     @Override
-    public Iterator<T> wrapIterator(Iterator<T> source) {
+    public Iterator<T> wrapIterator(int flags, Iterator<T> source) {
         Objects.requireNonNull(source);
 
         return Iterators.concat(source, stream.iterator());
     }
 
     @Override
-    public Sink<T> wrapSink(Sink sink) {
+    public Sink<T> wrapSink(int flags, Sink<T> sink) {
         Objects.requireNonNull(sink);
 
         return new Sink.ChainedValue<T>(sink) {
--- a/src/share/classes/java/util/streams/ops/CumulateOp.java	Wed Oct 24 14:41:29 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/CumulateOp.java	Wed Oct 24 15:39:53 2012 -0400
@@ -50,7 +50,7 @@
     }
 
     @Override
-    public TerminalSink<T, T> wrapSink(final Sink sink) {
+    public TerminalSink<T, T> wrapSink(int flags, final Sink<T> sink) {
         return new TerminalSink<T, T>() {
             private boolean first;
             private T state;
@@ -83,7 +83,7 @@
     }
 
     @Override
-    public Iterator<T> wrapIterator(Iterator<T> iterator) {
+    public Iterator<T> wrapIterator(int flags, Iterator<T> iterator) {
         Objects.requireNonNull(iterator);
         return iterator(iterator, op);
     }
@@ -189,7 +189,7 @@
                     }
                     else {
                         leafData = OpUtils.makeNodeBuilderFor(problem.helper, spliterator);
-                        TerminalSink<T, T> terminalSink = wrapSink(leafData);
+                        TerminalSink<T, T> terminalSink = wrapSink(problem.helper.getStreamFlags(), leafData);
                         OpUtils.intoWrapped(spliterator, problem.helper.wrapSink(terminalSink));
                         upward = terminalSink.getAndClearState();
                         // Special case -- if problem.depth == 0, just wrap the result and be done
--- a/src/share/classes/java/util/streams/ops/FilterOp.java	Wed Oct 24 14:41:29 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/FilterOp.java	Wed Oct 24 15:39:53 2012 -0400
@@ -49,12 +49,12 @@
     }
 
     @Override
-    public Iterator<T> wrapIterator(Iterator<T> source) {
+    public Iterator<T> wrapIterator(int flags, Iterator<T> source) {
         return iterator(source, predicate);
     }
 
     @Override
-    public Sink<T> wrapSink(final Sink sink) {
+    public Sink<T> wrapSink(int flags, final Sink sink) {
         Objects.requireNonNull(sink);
         return new Sink.ChainedValue<T>(sink) {
             @Override
--- a/src/share/classes/java/util/streams/ops/FlagDeclaringOp.java	Wed Oct 24 14:41:29 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/FlagDeclaringOp.java	Wed Oct 24 15:39:53 2012 -0400
@@ -44,13 +44,13 @@
     }
 
     @Override
-    public Iterator<T> wrapIterator(final Iterator<T> source) {
+    public Iterator<T> wrapIterator(int flags, final Iterator<T> source) {
         return source;
     }
 
     @Override
     @SuppressWarnings({"rawtypes", "unchecked"})
-    public Sink<T> wrapSink(Sink sink) {
+    public Sink<T> wrapSink(int flags, Sink sink) {
         return sink;
     }
 }
--- a/src/share/classes/java/util/streams/ops/FlatMapOp.java	Wed Oct 24 14:41:29 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/FlatMapOp.java	Wed Oct 24 15:39:53 2012 -0400
@@ -52,13 +52,13 @@
     }
 
     @Override
-    public Iterator<R> wrapIterator(final Iterator<T> source) {
+    public Iterator<R> wrapIterator(int flags, final Iterator<T> source) {
         Objects.requireNonNull(source);
         return iterator(source, mapper);
     }
 
     @Override
-    public Sink<T> wrapSink(final Sink sink) {
+    public Sink<T> wrapSink(int flags, final Sink sink) {
         Objects.requireNonNull(sink);
         return new Sink.ChainedValue<T>(sink) {
             public void accept(T t) {
--- a/src/share/classes/java/util/streams/ops/IntermediateOp.java	Wed Oct 24 14:41:29 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/IntermediateOp.java	Wed Oct 24 15:39:53 2012 -0400
@@ -61,20 +61,24 @@
      * Return an iterator of the elements of the stream. The operation will be
      * performed upon each element as it is returned by {@code Iterator.next()}.
      *
+     *
+     * @param flags the combined stream and operation flags up to but not including this operation.
      * @param in the source stream.
      * @return an iterator of the elements of the stream.
      */
-    Iterator<E_OUT> wrapIterator(Iterator<E_IN> in);
+    Iterator<E_OUT> wrapIterator(int flags, Iterator<E_IN> in);
 
     /**
      * Return a sink which will accept elements, perform the operation upon
      * each element and send it to the provided sink.
      *
+     *
+     * @param flags the combined stream and operation flags up to but not including this operation.
      * @param sink elements will be sent to this sink after the processing.
      * @return a sink which will accept elements and perform the operation upon
      *         each element.
      */
-    Sink<E_IN> wrapSink(Sink sink);
+    Sink<E_IN> wrapSink(int flags, Sink<E_OUT> sink);
 
     StreamShape inputShape() default { return StreamShape.VALUE; }
 
--- a/src/share/classes/java/util/streams/ops/LimitOp.java	Wed Oct 24 14:41:29 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/LimitOp.java	Wed Oct 24 15:39:53 2012 -0400
@@ -50,21 +50,21 @@
     }
 
     @Override
-    public Sink<T> wrapSink(Sink sink) {
+    public Sink<T> wrapSink(int flags, Sink sink) {
         // @@@ Cannot short circuit the sink
         // @@@ This smells somewhat
         throw new UnsupportedOperationException();
     }
 
     @Override
-    public Iterator<T> wrapIterator(Iterator<T> source) {
+    public Iterator<T> wrapIterator(int flags, Iterator<T> source) {
         return iterator(source, limit);
     }
 
     @Override
     public <S> Node<T> evaluateParallel(ParallelPipelineHelper<S, T> helper) {
         // Dumb serial implementation defering to iterator
-        final Iterator<T> i = wrapIterator(helper.iterator());
+        final Iterator<T> i = wrapIterator(helper.getStreamFlags(), helper.iterator());
 
         final int size = helper.isOutputInfinitelySized()
                          ? limit
--- a/src/share/classes/java/util/streams/ops/MapOp.java	Wed Oct 24 14:41:29 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/MapOp.java	Wed Oct 24 15:39:53 2012 -0400
@@ -48,12 +48,12 @@
     }
 
     @Override
-    public Iterator<R> wrapIterator(final Iterator<T> source) {
+    public Iterator<R> wrapIterator(int flags, final Iterator<T> source) {
         return iterator(source, mapper);
     }
 
     @Override
-    public Sink<T> wrapSink(Sink sink) {
+    public Sink<T> wrapSink(int flags, Sink sink) {
         return new Sink.ChainedValue<T>(sink) {
             @Override
             public void accept(T t) {
--- a/src/share/classes/java/util/streams/ops/SkipOp.java	Wed Oct 24 14:41:29 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/SkipOp.java	Wed Oct 24 15:39:53 2012 -0400
@@ -46,7 +46,7 @@
     }
 
     @Override
-    public Sink<T> wrapSink(final Sink sink) {
+    public Sink<T> wrapSink(int flags, final Sink sink) {
         Objects.requireNonNull(sink);
         return new Sink.ChainedValue<T>(sink) {
             int n = skip;
@@ -64,7 +64,7 @@
     }
 
     @Override
-    public Iterator<T> wrapIterator(Iterator<T> source) {
+    public Iterator<T> wrapIterator(int flags, Iterator<T> source) {
         return iterator(source, skip);
     }
 
--- a/src/share/classes/java/util/streams/ops/SortedOp.java	Wed Oct 24 14:41:29 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/SortedOp.java	Wed Oct 24 15:39:53 2012 -0400
@@ -68,7 +68,7 @@
     }
 
     @Override
-    public Sink<T> wrapSink(Sink sink) {
+    public Sink<T> wrapSink(int flags, Sink sink) {
         return new Sink.ChainedValue<T>(sink) {
             PriorityQueue<T> pq;
             @Override
@@ -92,7 +92,7 @@
     }
 
     @Override
-    public Iterator<T> wrapIterator(Iterator<T> iterator) {
+    public Iterator<T> wrapIterator(int flags, Iterator<T> iterator) {
         Objects.requireNonNull(iterator);
         return iterator(iterator, comparator);
     }
--- a/src/share/classes/java/util/streams/ops/StatefulOp.java	Wed Oct 24 14:41:29 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/StatefulOp.java	Wed Oct 24 15:39:53 2012 -0400
@@ -22,7 +22,7 @@
     <P_IN> Node<E_OUT> evaluateSequential(PipelineHelper<P_IN, E_IN> helper) default {
         // @@@ Can we determine the size from the pipeline and this operation?
         final NodeBuilder<E_OUT> nb = Nodes.makeVariableSizeBuilder();
-        helper.into(wrapSink(nb));
+        helper.into(wrapSink(helper.getStreamFlags(), nb));
         return nb.build();
     }
 }
--- a/src/share/classes/java/util/streams/ops/TeeOp.java	Wed Oct 24 14:41:29 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/TeeOp.java	Wed Oct 24 15:39:53 2012 -0400
@@ -40,12 +40,12 @@
     }
 
     @Override
-    public Iterator<T> wrapIterator(final Iterator<T> source) {
+    public Iterator<T> wrapIterator(int flags, final Iterator<T> source) {
         return iterator(source, tee);
     }
 
     @Override
-    public Sink<T> wrapSink(Sink sink) {
+    public Sink<T> wrapSink(int flags, Sink sink) {
         return new Sink.ChainedValue<T>(sink) {
             @Override
             public void accept(T t) {
--- a/src/share/classes/java/util/streams/ops/UniqOp.java	Wed Oct 24 14:41:29 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/UniqOp.java	Wed Oct 24 15:39:53 2012 -0400
@@ -57,7 +57,7 @@
     }
 
     @Override
-    public Sink<T> wrapSink(Sink sink) {
+    public Sink<T> wrapSink(int flags, Sink sink) {
         return new Sink.ChainedValue<T>(sink) {
             Set<T> seen;
 
@@ -83,7 +83,7 @@
     }
 
     @Override
-    public Iterator<T> wrapIterator(final Iterator<T> iterator) {
+    public Iterator<T> wrapIterator(int flags, final Iterator<T> iterator) {
         Objects.requireNonNull(iterator);
         return new Iterator<T>() {
             // Null if empty, non-null if valid. Obviously
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/FlagOpTest.java	Wed Oct 24 15:39:53 2012 -0400
@@ -0,0 +1,161 @@
+/*
+ * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package org.openjdk.tests.java.util.streams.ops;
+
+import org.openjdk.tests.java.util.streams.StreamTestDataProvider;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.streams.Sink;
+import java.util.streams.StreamOpFlags;
+import java.util.streams.ops.FlagDeclaringOp;
+
+import static org.openjdk.tests.java.util.LambdaTestHelpers.countTo;
+
+@Test
+public class FlagOpTest extends StreamOpTestCase {
+
+    @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
+    public void testFlagsPassThrough(String name, TestData<Integer> data) {
+
+        @SuppressWarnings({"unchecked", "rawtypes"})
+        TestFlagPassThroughOp<Integer>[] ops = new TestFlagPassThroughOp[3];
+        ops[0] = new TestFlagPassThroughOp<>();
+        ops[1] = new TestFlagPassThroughOp<>();
+        ops[2] = new TestFlagPassThroughOp<>();
+
+        ops[0].set(null, ops[1]);
+        ops[1].set(ops[0], ops[2]);
+        ops[2].set(ops[1], null);
+
+
+        exerciseOps(data, ops);
+    }
+
+    static class TestFlagPassThroughOp<T> extends FlagDeclaringOp<T> {
+        TestFlagPassThroughOp<T> upstream;
+        TestFlagPassThroughOp<T> downstream;
+
+        TestFlagPassThroughOp() {
+            super(0);
+        }
+
+        void set(TestFlagPassThroughOp<T> upstream, TestFlagPassThroughOp<T> downstream)  {
+            this.upstream = upstream;
+            this.downstream = downstream;
+        }
+
+        int wrapFlags;
+
+        @Override
+        public Iterator<T> wrapIterator(int flags, Iterator<T> source) {
+            this.wrapFlags = flags;
+
+            if (upstream != null) {
+                assertTrue(flags == upstream.wrapFlags);
+            }
+
+            return source;
+        }
+
+        @Override
+        @SuppressWarnings({"unchecked", "rawtypes"})
+        public Sink<T> wrapSink(int flags, Sink sink) {
+            this.wrapFlags = flags;
+
+            if (downstream != null) {
+                assertTrue(flags == downstream.wrapFlags);
+            }
+
+            return sink;
+        }
+    }
+
+    public void testFlagsAssigned() {
+        TestData<Integer> data = new CollectionTestData<>("List", countTo(10));
+        // @@@ Not possible to get direct acccess to the flags on stream accessor
+        // @@@ Expands stream flag test data
+        int streamFlags = StreamOpFlags.IS_ORDERED | StreamOpFlags.IS_SIZED;
+
+        // @@@ Expand the op flag test data
+        List<Integer> opFlags = Arrays.asList(
+                StreamOpFlags.NOT_DISTINCT,
+                StreamOpFlags.NOT_ORDERED,
+                StreamOpFlags.NOT_SIZED,
+                StreamOpFlags.NOT_SORTED);
+
+        // Calculate combined ops flags
+        List<Integer> opsFlags = new ArrayList<>();
+        opsFlags.add(StreamOpFlags.INITIAL_OPS_VALUE);
+        for (int i = 0; i < opFlags.size(); i++) {
+            opsFlags.add(StreamOpFlags.combineOpFlags(opFlags.get(i), opsFlags.get(i)));
+        }
+
+        // Calculate combined stream and ops flags
+        List<Integer> flags = new ArrayList<>();
+        for (int i : opsFlags) {
+            flags.add(StreamOpFlags.combineStreamFlags(streamFlags, i));
+        }
+
+        // Create operations
+        List<FlagDeclaringOp<Integer>> ops = new ArrayList<>();
+        for (int i = 0; i < opFlags.size(); i++) {
+            ops.add(new TestFlagExpectedOp<>(flags.get(i), opFlags.get(i)));
+        }
+        ops.add(new TestFlagExpectedOp<>(flags.get(opFlags.size()), 0));
+
+        exerciseOps(data, ops.toArray(createTestFlagExpectedOpArray(ops.size())));
+    }
+
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    static TestFlagExpectedOp<Integer>[] createTestFlagExpectedOpArray(int size) {
+        return new TestFlagExpectedOp[size];
+    }
+    static class TestFlagExpectedOp<T> extends FlagDeclaringOp<T> {
+        final int downstreamFlags;
+
+        TestFlagExpectedOp(int downstreamFlags, int flags) {
+            super(flags);
+            this.downstreamFlags = downstreamFlags;
+        }
+
+        @Override
+        public Iterator<T> wrapIterator(int flags, Iterator<T> downstream) {
+            assertEquals(downstreamFlags, flags);
+            return downstream;
+        }
+
+        @Override
+        @SuppressWarnings({"unchecked", "rawtypes"})
+        public Sink<T> wrapSink(int flags, Sink upstream) {
+            assertEquals(downstreamFlags, flags);
+            return upstream;
+        }
+    }
+
+}
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/StreamOpTestCase.java	Wed Oct 24 14:41:29 2012 -0400
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/StreamOpTestCase.java	Wed Oct 24 15:39:53 2012 -0400
@@ -56,33 +56,6 @@
 
     @SuppressWarnings({"rawtypes", "unchecked"})
     public static enum IntermediateOpTest {
-        // Create a sink and wrap it
-        DATA_FOR_EACH_TO_WRAPPED_SINK(false) {
-            boolean isApplicable(IntermediateOp[] ops) {
-                return !isShortCircuit(ops);
-            }
-
-            <T> void run(TestData<T> data, Sink sink, IntermediateOp[] ops) {
-                Sink<T> wrapped = sink(sink, ops);
-                wrapped.begin(data.size());
-                data.forEach(wrapped);
-                wrapped.end();
-            }
-        },
-
-        // Create a sink and wrap it, report the size as unknown
-        DATA_FOR_EACH_TO_WRAPPED_SINK_SIZE_UNKNOWN(false) {
-            boolean isApplicable(IntermediateOp[] ops) {
-                return !isShortCircuit(ops);
-            }
-
-            <T> void run(TestData<T> data, Sink sink, IntermediateOp[] ops) {
-                Sink<T> wrapped = sink(sink, ops);
-                wrapped.begin(-1);
-                data.forEach(wrapped);
-                wrapped.end();
-            }
-        },
 
         // Wrap with SequentialPipeline.op, and iterate in push mode
         STREAM_FOR_EACH(false) {
@@ -262,12 +235,13 @@
                                                         Factory<IntermediateOp[]> fops) {
         StreamResult<U> refResult = new StreamResult<>(b.data.size());
 
-        // First pass -- grab an iterator and wrap it, and call that the reference result
+        // First pass -- sequentially into a list, and call that the reference result
         b.before.apply(b.data);
-        Iterator<U> it = (Iterator<U>) b.data.iterator(fops.make());
-        while (it.hasNext())
-            refResult.accept(it.next());
+        ArrayList<U> elements = (ArrayList<U>)stream(b.data.seq(fops.make())).into(new ArrayList());
         b.after.apply(b.data);
+        for (U e : elements) {
+            refResult.accept(e);
+        }
 
         for (IntermediateOpTest test : b.testSet) {
             IntermediateOp[] ops = fops.make();
@@ -470,7 +444,7 @@
     @SuppressWarnings({ "rawtypes", "unchecked" })
     private static<U> U chain(AbstractPipeline pipe, TerminalOp<?, U> op) {
         switch (pipe.getShape()) {
-            case VALUE: return (U) ((ValuePipeline) pipe).pipeline(op);
+            case VALUE: return (U) pipe.pipeline(op);
             default: throw new IllegalStateException(pipe.getShape().toString());
         }
     }
@@ -494,13 +468,6 @@
         return (Stream<U>) pipe;
     }
 
-    @SuppressWarnings("rawtypes")
-    protected static Sink sink(Sink sink, IntermediateOp[] ops) {
-        for (int i=ops.length-1; i >= 0; i--)
-            sink = ops[i].wrapSink(sink);
-        return sink;
-    }
-
     public static interface TestData<T> extends Traversable<T>, Sized {
         AbstractPipeline<?, T> seq();
         AbstractPipeline<?, T> par();
@@ -533,14 +500,6 @@
         }
 
         @SuppressWarnings({ "rawtypes", "unchecked" })
-        Iterator<?> iterator(IntermediateOp... ops) default {
-            Iterator<T> iterator = iterator();
-            for (IntermediateOp op : ops)
-                iterator = op.wrapIterator(iterator);
-            return iterator;
-        }
-
-        @SuppressWarnings({ "rawtypes", "unchecked" })
         AbstractPipeline<?, ?> seq(IntermediateOp... ops) default {
             return chain(seq(), ops);
         }
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/UniqOpTest.java	Wed Oct 24 14:41:29 2012 -0400
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/UniqOpTest.java	Wed Oct 24 15:39:53 2012 -0400
@@ -28,7 +28,6 @@
 import org.openjdk.tests.java.util.streams.StreamTestDataProvider;
 import org.testng.annotations.Test;
 
-import java.util.streams.ops.MapOp;
 import java.util.streams.ops.SortedOp;
 import java.util.streams.ops.UniqOp;
 
@@ -40,13 +39,6 @@
  */
 @Test
 public class UniqOpTest extends StreamOpTestCase {
-    public void testRawIterator() {
-        assertCountSum(UniqOp.<Integer>singleton().wrapIterator(repeat(0, 10).iterator()), 1 , 0);
-        assertCountSum(UniqOp.<Integer>singleton().wrapIterator(repeat(1, 10).iterator()), 1 , 1);
-        assertCountSum(UniqOp.<Integer>singleton().wrapIterator(countTo(0).iterator()), 0, 0);
-        assertCountSum(UniqOp.<Integer>singleton().wrapIterator(countTo(10).iterator()), 10, 55);
-        assertCountSum(UniqOp.<Integer>singleton().wrapIterator(countTo(10).iterator()), 10, 55);
-    }
 
     public void testUniqOp() {
         assertCountSum(repeat(0, 10).stream().uniqueElements(), 1 , 0);