changeset 6623:170df812b0b2

- a parent stream may only have one child stream i.e. linear pipeline. - AbstractPipeline now holds an IntermediateOp[] and no longer holds a reference to an upstream AbstractPipeline instance.
author psandoz
date Wed, 21 Nov 2012 15:06:06 +0100
parents c84f27e782f0
children 0f7111a3ef65
files src/share/classes/java/util/stream/AbstractPipeline.java src/share/classes/java/util/stream/PipelineHelper.java src/share/classes/java/util/stream/op/CumulateOp.java src/share/classes/java/util/stream/op/GroupByOp.java src/share/classes/java/util/stream/op/IntermediateOp.java src/share/classes/java/util/stream/op/ReduceByOp.java src/share/classes/java/util/stream/op/SortedOp.java src/share/classes/java/util/stream/op/UniqOp.java src/share/classes/java/util/stream/primitive/IntLimitOp.java src/share/classes/java/util/stream/primitive/Primitives.java test-ng/tests/org/openjdk/tests/java/util/stream/OpTestCase.java test-ng/tests/org/openjdk/tests/java/util/stream/StreamLinkTest.java test-ng/tests/org/openjdk/tests/java/util/stream/StreamOpFlagsTest.java test-ng/tests/org/openjdk/tests/java/util/stream/StreamReuseTest.java test-ng/tests/org/openjdk/tests/java/util/stream/op/SortedOpTest.java test-ng/tests/org/openjdk/tests/java/util/stream/op/UniqOpTest.java
diffstat 16 files changed, 222 insertions(+), 97 deletions(-) [+]
line wrap: on
line diff
--- a/src/share/classes/java/util/stream/AbstractPipeline.java	Wed Nov 21 11:01:36 2012 +0100
+++ b/src/share/classes/java/util/stream/AbstractPipeline.java	Wed Nov 21 15:06:06 2012 +0100
@@ -76,10 +76,12 @@
  * @author Brian Goetz
  */
 public abstract class AbstractPipeline<E_IN, E_OUT> implements BaseStream<E_OUT> {
-    protected final AbstractPipeline<?, E_IN> upstream;
-    protected final IntermediateOp<E_IN, E_OUT> op;
+    private static final IntermediateOp[] EMPTY_OPS_ARRAY = new IntermediateOp[0];
+
+    protected boolean linked;
+    protected final SourceState<?> sourceState;
     protected final int depth;
-    protected final SourceState<?> sourceState;
+    protected final IntermediateOp[] ops;
     protected final int combinedOpFlags;
 
     /**
@@ -91,10 +93,11 @@
      */
     protected AbstractPipeline(Spliterator<?> spliterator, int sourceFlags, StreamShape shape) {
         Objects.requireNonNull(spliterator);
-        this.upstream = null;
-        this.op = null;
+
+        this.linked = false;
+        this.sourceState = new SpliteratorSourceState<>(spliterator, sourceFlags, shape);
         this.depth = 0;
-        this.sourceState = new SpliteratorSourceState<>(spliterator, sourceFlags, shape);
+        this.ops = EMPTY_OPS_ARRAY;
         this.combinedOpFlags = StreamOpFlags.INITIAL_OPS_VALUE;
     }
 
@@ -105,14 +108,27 @@
      * @param op the operation performed upon elements.
      */
     protected AbstractPipeline(AbstractPipeline<?, E_IN> upstream, IntermediateOp<E_IN, E_OUT> op) {
-        this.upstream = Objects.requireNonNull(upstream);
-        this.op = Objects.requireNonNull(op);
+        Objects.requireNonNull(upstream);
+
+        if (upstream.linked) {
+            throw new IllegalStateException("Parent stream is already linked to a child stream");
+        }
+        upstream.linked = true;
+
+        this.linked = false;
+        this.sourceState = upstream.sourceState;
         this.depth = upstream.depth + 1;
-        this.sourceState = upstream.sourceState;
+
+        if (upstream.ops.length >= depth) {
+            this.ops = upstream.ops;
+        } else {
+            this.ops = Arrays.copyOf(upstream.ops, upstream.ops.length + 4);
+        }
+        ops[depth - 1] = Objects.requireNonNull(op);
+
         this.combinedOpFlags = StreamOpFlags.combineOpFlags(op.getOpFlags(), upstream.combinedOpFlags);
 
         assert upstream.getOutputShape().getStreamType() == op.inputShape().getStreamType();
-        assert (upstream.depth == 0) ^ (upstream.op != null);
     }
 
     protected<R> R evaluate(TerminalOp<E_OUT, R> terminal) {
@@ -124,30 +140,25 @@
     }
 
     protected<R> R evaluateParallel(TerminalOp<E_OUT, R> terminal) {
-        final IntermediateOp[] ops = ops();
-        // Ops flags length is one greater than op array to hold initial value
+        // Combined flags length is one greater than depth to hold initial value
         // Given an op whose index is i then the flags input to that op are at flags[i]
         // and the flags output from that op are at flags[i + 1]
-        final int[] opsFlags = new int[ops.length + 1];
+        final int[] opsFlags = new int[depth + 1];
         int fromOp = 0;
         int upToOp = 0;
         Node<?> iNode = null;
         Spliterator<?> iSource = sourceState.spliterator();
         int iSourceFlags = sourceState.getSourceFlags();
         while (true) {
-            while (upToOp < ops.length && !ops[upToOp].isStateful())
+            while (upToOp < depth && !ops[upToOp].isStateful())
                 upToOp++;
 
-            if (upToOp < ops.length) {
+            if (upToOp < depth) {
                 IntermediateOp<?, ?> statefulOp = ops[upToOp];
-                iNode = evaluateParallel(iNode, iSource, iSourceFlags, opsFlags, ops, fromOp, upToOp, statefulOp);
+                iNode = evaluateParallel(iNode, iSource, iSourceFlags, opsFlags, fromOp, upToOp, statefulOp);
 
-                // Get the combined stream and ops flags for the stateful op
-                int sourceAndOpsFlags = StreamOpFlags.combineStreamFlags(
-                        iSourceFlags,
-                        StreamOpFlags.combineOpFlags(statefulOp.getOpFlags(), opsFlags[upToOp]));
                 // Get the source flags for the intermediate stream
-                iSourceFlags = StreamOpFlags.getStreamFlags(sourceAndOpsFlags)
+                iSourceFlags = StreamOpFlags.getStreamFlags(StreamOpFlags.combineOpFlags(statefulOp.getOpFlags(), opsFlags[upToOp]))
                                | StreamOpFlags.IS_SIZED | StreamOpFlags.IS_ORDERED;
                 // Create stream accessor from node using the stream flags
                 iSource = iNode.spliterator();
@@ -155,11 +166,11 @@
                 fromOp = ++upToOp;
 
                 if (!StreamOpFlags.PARALLEL.isKnown(iSourceFlags)) {
-                    return evaluateSequential(iNode, iSource, iSourceFlags, opsFlags, ops, fromOp, ops.length, terminal);
+                    return evaluateSequential(iNode, iSource, iSourceFlags, opsFlags, fromOp, depth, terminal);
                 }
             }
             else {
-                return evaluateParallel(iNode, iSource, iSourceFlags, opsFlags, ops, fromOp, upToOp, terminal);
+                return evaluateParallel(iNode, iSource, iSourceFlags, opsFlags, fromOp, upToOp, terminal);
             }
         }
     }
@@ -169,10 +180,10 @@
                                    Spliterator<?> spliterator,
                                    int sourceFlags,
                                    int[] opsFlags,
-                                   IntermediateOp[] ops, int from, int upTo,
+                                   int from, int upTo,
                                    StreamOp<?, R> terminal) {
         return (R) terminal.evaluateParallel(new ParallelImplPipelineHelper(node, spliterator, sourceFlags,
-                                                                            opsFlags, ops, from, upTo,
+                                                                            opsFlags, from, upTo,
                                                                             terminal.inputShape()));
     }
 
@@ -181,10 +192,10 @@
                                      Spliterator<?> spliterator,
                                      int sourceFlags,
                                      int[] opsFlags,
-                                     IntermediateOp[] ops, int from, int upTo,
+                                     int from, int upTo,
                                      TerminalOp<E_OUT, R> terminal) {
         return (R) terminal.evaluateSequential(new SequentialImplPipelineHelper(node, spliterator, sourceFlags,
-                                                                                opsFlags, ops, from, upTo,
+                                                                                opsFlags, from, upTo,
                                                                                 terminal.inputShape()));
     }
 
@@ -201,7 +212,6 @@
         final int from;
         final int upTo;
         final int sourceFlags;
-        final int sourceAndOpsFlags;
         final StreamShape terminalShape;
 
         AbstractPipelineHelper(Spliterator<P_IN> spliterator, int sourceFlags,
@@ -222,20 +232,19 @@
             this.sourceFlags = sourceFlags;
             this.terminalShape = terminalShape;
 
-            int flags = opsFlags[from] = StreamOpFlags.INITIAL_OPS_VALUE;
+            // Stream flags are combined with the initial ops flags at the start
+            int flags = opsFlags[from] = StreamOpFlags.combineStreamFlags(sourceFlags, StreamOpFlags.INITIAL_OPS_VALUE);
             for (int i = from; i < upTo; i++) {
                 flags = opsFlags[i + 1] = StreamOpFlags.combineOpFlags(ops[i].getOpFlags(), flags);
             }
-
-            this.sourceAndOpsFlags = StreamOpFlags.combineStreamFlags(sourceFlags, flags);
         }
 
         protected boolean isOutputSizeKnown() {
-            return StreamOpFlags.SIZED.isKnown(sourceAndOpsFlags);
+            return StreamOpFlags.SIZED.isKnown(getFlags());
         }
 
         protected boolean isShortCircuit() {
-            return StreamOpFlags.SHORT_CIRCUIT.isKnown(sourceAndOpsFlags);
+            return StreamOpFlags.SHORT_CIRCUIT.isKnown(getFlags());
         }
 
         protected boolean hasZeroDepth() {
@@ -257,8 +266,8 @@
         }
 
         @Override
-        public int getStreamFlags() {
-            return sourceAndOpsFlags;
+        public int getFlags() {
+            return opsFlags[upTo];
         }
 
         @Override
@@ -272,7 +281,7 @@
 
             for (int i = upTo - 1; i >= from; i--) {
                 sink = ops[i].wrapSink(
-                        StreamOpFlags.combineStreamFlags(sourceFlags, opsFlags[i]),
+                        opsFlags[i],
                         sink);
             }
             return sink;
@@ -284,7 +293,7 @@
 
             for (int i = from; i < upTo; i++) {
                 it = ops[i].wrapIterator(
-                        StreamOpFlags.combineStreamFlags(sourceFlags, opsFlags[i]),
+                        opsFlags[i],
                         it);
             }
             return it;
@@ -293,21 +302,14 @@
 
     class SequentialImplPipelineHelper<P_IN> extends AbstractPipelineHelper<P_IN, E_OUT> {
 
-        SequentialImplPipelineHelper(Spliterator<P_IN> spliterator, int sourceFlags,
-                                     IntermediateOp[] ops,
+        SequentialImplPipelineHelper(Node<P_IN> node, Spliterator<P_IN> spliterator, int sourceFlags,
+                                     int[] opsFlags, int from, int to,
                                      StreamShape terminalShape) {
-            // Start from 2nd element since 1st is the head containing the source
-            super(spliterator, sourceFlags, new int[ops.length + 1], ops, 0, ops.length, terminalShape);
+            super(node, spliterator, sourceFlags, opsFlags, AbstractPipeline.this.ops, from, to, terminalShape);
         }
 
-        SequentialImplPipelineHelper(Node<P_IN> node, Spliterator<P_IN> spliterator, int sourceFlags,
-                                     int[] opsFlags, IntermediateOp[] ops, int from, int to,
-                                     StreamShape terminalShape) {
-            super(node, spliterator, sourceFlags, opsFlags, ops, from, to, terminalShape);
-        }
-
-        <R> SequentialImplPipelineHelper(Spliterator spliterator, int sourceFlags, StreamShape terminalShape) {
-            this(spliterator, sourceFlags, ops(), terminalShape);
+        <R> SequentialImplPipelineHelper(Spliterator<P_IN> spliterator, int sourceFlags, StreamShape terminalShape) {
+            super(spliterator, sourceFlags, new int[depth + 1], AbstractPipeline.this.ops, 0, depth, terminalShape);
         }
 
         boolean requirePull() {
@@ -351,9 +353,9 @@
         final long targetSize;
 
         ParallelImplPipelineHelper(Node<P_IN> node, Spliterator<P_IN> spliterator, int sourceFlags,
-                                   int[] opsFlags, IntermediateOp[] ops, int from, int to,
+                                   int[] opsFlags, int from, int to,
                                    StreamShape terminalShape) {
-            super(node, spliterator, sourceFlags, opsFlags, ops, from, to, terminalShape);
+            super(node, spliterator, sourceFlags, opsFlags, AbstractPipeline.this.ops, from, to, terminalShape);
             this.targetSize = calculateTargetSize();
         }
 
@@ -409,16 +411,6 @@
         }
     }
 
-    private IntermediateOp[] ops() {
-        IntermediateOp[] ops = new IntermediateOp[depth];
-        AbstractPipeline p = this;
-        for (int i = depth - 1; i >= 0; i--) {
-            ops[i] = p.op;
-            p = p.upstream;
-        }
-        return ops;
-    }
-
     // Chaining and result methods
 
     /**
@@ -490,7 +482,7 @@
      * source. Otherwise, it's output shape corresponds to the output shape of the associated operation.
      */
     public StreamShape getOutputShape() {
-        return op == null ? sourceState.getSourceShape() : op.outputShape();
+        return depth == 0 ? sourceState.getSourceShape() : ops[depth - 1].outputShape();
     }
 
     /**
@@ -500,7 +492,7 @@
      * source. Otherwise, it's input shape corresponds to the input shape of the associated operation.
      */
     public StreamShape getInputShape() {
-        return op == null ? sourceState.getSourceShape() : op.inputShape();
+        return depth == 0 ? sourceState.getSourceShape() : ops[depth - 1].inputShape();
     }
 
     // BaseStream
@@ -510,20 +502,12 @@
     public Iterator<E_OUT> iterator() {
         Iterator iterator = sourceState.iterator();
 
-        AbstractPipeline[] pipes = new AbstractPipeline[depth + 1];
-        AbstractPipeline p = this;
-        for (int i = depth; i >= 0; i--) {
-            pipes[i] = p;
-            p = p.upstream;
-        }
-
         // Ensure the parallel flag is cleared, if set
         int sourceFlags = StreamOpFlags.PARALLEL.clearFromFlags(sourceState.getSourceFlags());
         int opsFlags = StreamOpFlags.INITIAL_OPS_VALUE;
-        for (int i = 1; i <= depth; i++) {
-            p = pipes[i];
-            iterator = p.op.wrapIterator(StreamOpFlags.combineStreamFlags(sourceFlags, opsFlags), iterator);
-            opsFlags = StreamOpFlags.combineOpFlags(pipes[i].op.getOpFlags(), opsFlags);
+        for (int i = 0; i < depth; i++) {
+            iterator = ops[i].wrapIterator(StreamOpFlags.combineStreamFlags(sourceFlags, opsFlags), iterator);
+            opsFlags = StreamOpFlags.combineOpFlags(ops[i].getOpFlags(), opsFlags);
         }
 
         return iterator;
--- a/src/share/classes/java/util/stream/PipelineHelper.java	Wed Nov 21 11:01:36 2012 +0100
+++ b/src/share/classes/java/util/stream/PipelineHelper.java	Wed Nov 21 15:06:06 2012 +0100
@@ -34,10 +34,10 @@
 public interface PipelineHelper<P_IN, P_OUT> {
 
     /**
-     * @return the combined stream and operation flags.
+     * @return the combined stream and operation flags for the output of the pipeline.
      * @see {@link StreamOpFlags}
      */
-    int getStreamFlags();
+    int getFlags();
 
     /**
      * Get the size of the stream output from the pipeline if known.
--- a/src/share/classes/java/util/stream/op/CumulateOp.java	Wed Nov 21 11:01:36 2012 +0100
+++ b/src/share/classes/java/util/stream/op/CumulateOp.java	Wed Nov 21 15:06:06 2012 +0100
@@ -194,7 +194,7 @@
                     }
                     else {
                         leafData = OpUtils.makeNodeBuilderFor(problem.helper, spliterator);
-                        TerminalSink<T, T> terminalSink = wrapSink(problem.helper.getStreamFlags(), leafData);
+                        TerminalSink<T, T> terminalSink = wrapSink(problem.helper.getFlags(), leafData);
                         OpUtils.intoWrapped(spliterator, problem.helper.wrapSink(terminalSink));
                         upward = terminalSink.getAndClearState();
                         // Special case -- if problem.depth == 0, just wrap the result and be done
--- a/src/share/classes/java/util/stream/op/GroupByOp.java	Wed Nov 21 11:01:36 2012 +0100
+++ b/src/share/classes/java/util/stream/op/GroupByOp.java	Wed Nov 21 15:06:06 2012 +0100
@@ -67,7 +67,7 @@
 
     @Override
     public <S> Map<K, Collection<T>> evaluateParallel(ParallelPipelineHelper<S, T> helper) {
-        if (StreamOpFlags.ORDERED.isKnown(helper.getStreamFlags())) {
+        if (StreamOpFlags.ORDERED.isKnown(helper.getFlags())) {
             // @@@ Should be able to use a ctor ref here, but we get a runtime failure
             return OpUtils.parallelReduce(helper, () -> new GroupBySink());
         }
--- a/src/share/classes/java/util/stream/op/IntermediateOp.java	Wed Nov 21 11:01:36 2012 +0100
+++ b/src/share/classes/java/util/stream/op/IntermediateOp.java	Wed Nov 21 15:06:06 2012 +0100
@@ -89,7 +89,7 @@
     default <P_IN> Node<E_OUT> evaluateSequential(PipelineHelper<P_IN, E_IN> helper) {
         // @@@ Can we determine the size from the pipeline and this operation?
         final NodeBuilder<E_OUT> nb = outputShape().makeNodeBuilder(-1);
-        helper.into(wrapSink(helper.getStreamFlags(), nb));
+        helper.into(wrapSink(helper.getFlags(), nb));
         return nb.build();
     }
 
--- a/src/share/classes/java/util/stream/op/ReduceByOp.java	Wed Nov 21 11:01:36 2012 +0100
+++ b/src/share/classes/java/util/stream/op/ReduceByOp.java	Wed Nov 21 15:06:06 2012 +0100
@@ -92,7 +92,7 @@
 
     @Override
     public <S> Map<U, W> evaluateParallel(ParallelPipelineHelper<S, T> helper) {
-        if (StreamOpFlags.ORDERED.isKnown(helper.getStreamFlags())) {
+        if (StreamOpFlags.ORDERED.isKnown(helper.getFlags())) {
             // @@@ Should be able to use a ctor ref here, but we get a runtime failure
             return OpUtils.parallelReduce(helper, () -> new ReduceBySink());
         }
--- a/src/share/classes/java/util/stream/op/SortedOp.java	Wed Nov 21 11:01:36 2012 +0100
+++ b/src/share/classes/java/util/stream/op/SortedOp.java	Wed Nov 21 15:06:06 2012 +0100
@@ -167,7 +167,7 @@
     }
 
     public <P_IN> Node<T> evaluateParallel(ParallelPipelineHelper<P_IN, T> helper) {
-        if (StreamOpFlags.SORTED.isKnown(helper.getStreamFlags())) {
+        if (StreamOpFlags.SORTED.isKnown(helper.getFlags())) {
             return helper.collectOutput();
         }
         else {
--- a/src/share/classes/java/util/stream/op/UniqOp.java	Wed Nov 21 11:01:36 2012 +0100
+++ b/src/share/classes/java/util/stream/op/UniqOp.java	Wed Nov 21 15:06:06 2012 +0100
@@ -182,12 +182,12 @@
 
     @Override
     public <S> Node<T> evaluateParallel(ParallelPipelineHelper<S, T> helper) {
-        if (StreamOpFlags.DISTINCT.isKnown(helper.getStreamFlags())) {
+        if (StreamOpFlags.DISTINCT.isKnown(helper.getFlags())) {
             // No-op
             return helper.collectOutput();
         }
-        else if (StreamOpFlags.SORTED.isKnown(helper.getStreamFlags())) {
-            if (!StreamOpFlags.ORDERED.isKnown(helper.getStreamFlags())) {
+        else if (StreamOpFlags.SORTED.isKnown(helper.getFlags())) {
+            if (!StreamOpFlags.ORDERED.isKnown(helper.getFlags())) {
                 // @@@ Does this make sense?
             }
 
@@ -195,7 +195,7 @@
             return Nodes.node(s);
         }
         else {
-            if (StreamOpFlags.ORDERED.isKnown(helper.getStreamFlags())) {
+            if (StreamOpFlags.ORDERED.isKnown(helper.getFlags())) {
                 Set<T> s = OpUtils.parallelReduce(helper, () -> new UniqByPreserveOrderSink<T>());
                 return Nodes.node(s);
             }
--- a/src/share/classes/java/util/stream/primitive/IntLimitOp.java	Wed Nov 21 11:01:36 2012 +0100
+++ b/src/share/classes/java/util/stream/primitive/IntLimitOp.java	Wed Nov 21 15:06:06 2012 +0100
@@ -24,14 +24,11 @@
  */
 package java.util.stream.primitive;
 
-import java.util.Collections;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.stream.*;
 import java.util.stream.op.Node;
-import java.util.stream.op.NodeBuilder;
-import java.util.stream.op.Nodes;
 import java.util.stream.op.StatefulOp;
 
 public class IntLimitOp implements StatefulOp<Integer, Integer> {
@@ -72,7 +69,7 @@
     @Override
     public <S> Node<Integer> evaluateParallel(ParallelPipelineHelper<S, Integer> helper) {
         // Dumb serial implementation defering to iterator
-        final IntIterator i = wrapIterator(helper.getStreamFlags(), helper.iterator());
+        final IntIterator i = wrapIterator(helper.getFlags(), helper.iterator());
 
         final IntNodeBuilder nb = IntNodes.makeBuilder(Math.min(helper.getOutputSizeIfKnown(), limit));
         i.forEach(nb);
--- a/src/share/classes/java/util/stream/primitive/Primitives.java	Wed Nov 21 11:01:36 2012 +0100
+++ b/src/share/classes/java/util/stream/primitive/Primitives.java	Wed Nov 21 15:06:06 2012 +0100
@@ -65,6 +65,16 @@
         }
     }
 
+    // IntIterable
+
+    public static IntIterable iterable(int[] array) {
+        return () -> iterator(array, 0, array.length);
+    }
+
+    public static IntIterable iterable(int[] array, int offset, int length) {
+        return () -> iterator(array, offset, length);
+    }
+
     // IntIterator
 
     public static IntIterator emptyIntIterator() {
--- a/test-ng/tests/org/openjdk/tests/java/util/stream/OpTestCase.java	Wed Nov 21 11:01:36 2012 +0100
+++ b/test-ng/tests/org/openjdk/tests/java/util/stream/OpTestCase.java	Wed Nov 21 15:06:06 2012 +0100
@@ -185,8 +185,13 @@
 
         @SuppressWarnings("unchecked")
         public <I extends Iterable<U> & Sized> ExcerciseDataStreamBuilder<T, U, S_IN, S_OUT> expectedResult(I expectedResult) {
-            NodeBuilder<U> resultBuilder = shape.makeNodeBuilder(expectedResult.size());
-            resultBuilder.begin(expectedResult.size());
+            return expectedResult(expectedResult, expectedResult.size());
+        }
+
+        @SuppressWarnings("unchecked")
+        public ExcerciseDataStreamBuilder<T, U, S_IN, S_OUT> expectedResult(Iterable<U> expectedResult, int size) {
+            NodeBuilder<U> resultBuilder = shape.makeNodeBuilder(size);
+            resultBuilder.begin(size);
             expectedResult.forEach(resultBuilder);
             resultBuilder.end();
             this.refResult = resultBuilder.build();
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test-ng/tests/org/openjdk/tests/java/util/stream/StreamLinkTest.java	Wed Nov 21 15:06:06 2012 +0100
@@ -0,0 +1,74 @@
+/*
+ * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package org.openjdk.tests.java.util.stream;
+
+import org.openjdk.tests.java.util.stream.primitive.IntStreamTestData;
+import org.openjdk.tests.java.util.stream.primitive.IntStreamTestDataProvider;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.Function;
+import java.util.stream.BaseStream;
+import java.util.stream.Stream;
+import java.util.stream.primitive.IntStream;
+import java.util.stream.primitive.Primitives;
+
+@Test
+public class StreamLinkTest extends OpTestCase {
+
+    private <T, S extends BaseStream<T>> Function<S, S> apply(int n, Function<S, S> f) {
+        return s -> {
+            for (int i = 0; i < n; i++) {
+                s = f.apply(s);
+            }
+            return s;
+        };
+    }
+
+    @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
+    public void testManyStreams(String name, StreamTestData<Integer> data) {
+        for (int n : Arrays.asList(0, 1, 2, 3, 4, 5, 255, 1000)) {
+            List<Integer> expected = data.stream().map(e -> (Integer) (e + n)).into(new ArrayList<Integer>());
+            withData(data).
+                    stream(apply(n, (Stream<Integer> s) -> s.map(e -> (Integer) (e + 1)))).
+                    expectedResult(expected).
+                    exercise();
+        }
+    }
+
+    @Test(dataProvider = "IntStreamTestData", dataProviderClass = IntStreamTestDataProvider.class)
+    public void testManyStreams(String name, IntStreamTestData data) {
+        for (int n : Arrays.asList(0, 1, 2, 3, 4, 5, 255, 1000)) {
+            int[] expected = data.stream().map(e -> e + n).toArray();
+
+            withData(data).
+                    stream(apply(n, (IntStream s) -> s.map(e -> e + 1))).
+                    expectedResult(Primitives.iterable(expected), expected.length).
+                    exercise();
+        }
+    }
+}
--- a/test-ng/tests/org/openjdk/tests/java/util/stream/StreamOpFlagsTest.java	Wed Nov 21 11:01:36 2012 +0100
+++ b/test-ng/tests/org/openjdk/tests/java/util/stream/StreamOpFlagsTest.java	Wed Nov 21 15:06:06 2012 +0100
@@ -129,7 +129,28 @@
             }
     }
 
-    public void test() {
+    public void testCombineBefore() {
+        int sourceFlags = StreamOpFlags.IS_SIZED | StreamOpFlags.IS_DISTINCT;
+
+        List<Integer> ops = Arrays.asList(StreamOpFlags.NOT_SIZED, StreamOpFlags.IS_ORDERED | StreamOpFlags.IS_SORTED);
+
+        int flags = StreamOpFlags.combineStreamFlags(sourceFlags, StreamOpFlags.INITIAL_OPS_VALUE);
+        for (int opFlags : ops) {
+            flags = combineOpFlags(opFlags, flags);
+        }
+
+        assertFalse(StreamOpFlags.SIZED.isKnown(flags));
+        assertTrue(StreamOpFlags.DISTINCT.isKnown(flags));
+        assertTrue(StreamOpFlags.SORTED.isKnown(flags));
+        assertTrue(StreamOpFlags.ORDERED.isKnown(flags));
+
+        assertFalse(StreamOpFlags.SIZED.isKnownOnOpFlags(flags));
+        assertTrue(StreamOpFlags.DISTINCT.isKnownOnOpFlags(flags));
+        assertTrue(StreamOpFlags.SORTED.isKnownOnOpFlags(flags));
+        assertTrue(StreamOpFlags.ORDERED.isKnownOnOpFlags(flags));
+    }
+
+    public void testCombineAfter() {
         int sourceFlags = StreamOpFlags.IS_SIZED | StreamOpFlags.IS_DISTINCT;
 
         List<Integer> ops = Arrays.asList(StreamOpFlags.NOT_SIZED, StreamOpFlags.IS_ORDERED | StreamOpFlags.IS_SORTED);
--- a/test-ng/tests/org/openjdk/tests/java/util/stream/StreamReuseTest.java	Wed Nov 21 11:01:36 2012 +0100
+++ b/test-ng/tests/org/openjdk/tests/java/util/stream/StreamReuseTest.java	Wed Nov 21 15:06:06 2012 +0100
@@ -87,6 +87,14 @@
     // Stream
 
     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
+    public void testTwoStreams(String name, StreamTestData<Integer> data) {
+        assertSecondFails(data,
+                          (Stream<Integer> s) -> s.map(i -> i), (Stream<Integer> s) -> s.map(i -> i),
+                          IllegalStateException.class,
+                          "Stream map / map succeeded erroneously");
+    }
+
+    @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
     public void testTwoTerminals(String name, StreamTestData<Integer> data) {
         assertSecondFails(data,
                           (Stream<Integer> s) -> s.findFirst(), (Stream<Integer> s) -> s.findFirst(),
@@ -107,16 +115,32 @@
         assertSecondFails(data,
                           (Stream<Integer> s) -> s.iterator(), (Stream<Integer> s) -> s.findFirst(),
                           IllegalStateException.class,
-                          "Stream findFirst / iterator succeeded erroneously");
+                          "Stream iterator / findFirst succeeded erroneously");
         assertSecondFails(data,
                           (Stream<Integer> s) -> s.findFirst(), (Stream<Integer> s) -> s.iterator(),
                           IllegalStateException.class,
-                          "Stream iterator / findFirst succeeded erroneously");
+                          "Stream findFirst / iterator succeeded erroneously");
     }
 
     // IntStream
 
     @Test(dataProvider = "IntStreamTestData", dataProviderClass = IntStreamTestDataProvider.class)
+    public void testTwoStreams(String name, IntStreamTestData data) {
+        assertSecondFails(data,
+                          (IntStream s) -> s.map(i -> i), (IntStream s) -> s.map(i -> i),
+                          IllegalStateException.class,
+                          "IntStream map / map succeeded erroneously");
+    }
+
+    @Test(dataProvider = "IntStreamTestData", dataProviderClass = IntStreamTestDataProvider.class)
+    public void testTwoTerminals(String name, IntStreamTestData data) {
+        assertSecondFails(data,
+                          (IntStream s) -> s.sum(), (IntStream s) -> s.sum(),
+                          IllegalStateException.class,
+                          "IntStream sum / sum succeeded erroneously");
+    }
+
+    @Test(dataProvider = "IntStreamTestData", dataProviderClass = IntStreamTestDataProvider.class)
     public void testTwoIterators(String name, IntStreamTestData data) {
         assertSecondFails(data,
                           (IntStream s) -> s.iterator(), (IntStream s) -> s.iterator(),
@@ -124,5 +148,15 @@
                           "IntStream iterator / iterator succeeded erroneously");
     }
 
-    // @@@ findFirst tests when implemented
+    @Test(dataProvider = "IntStreamTestData", dataProviderClass = IntStreamTestDataProvider.class)
+    public void testTerminalIterator(String name, IntStreamTestData data) {
+        assertSecondFails(data,
+                          (IntStream s) -> s.iterator(), (IntStream s) -> s.sum(),
+                          IllegalStateException.class,
+                          "IntStream iterator / sum succeeded erroneously");
+        assertSecondFails(data,
+                          (IntStream s) -> s.sum(), (IntStream s) -> s.iterator(),
+                          IllegalStateException.class,
+                          "Stream sum / iterator succeeded erroneously");
+    }
 }
--- a/test-ng/tests/org/openjdk/tests/java/util/stream/op/SortedOpTest.java	Wed Nov 21 11:01:36 2012 +0100
+++ b/test-ng/tests/org/openjdk/tests/java/util/stream/op/SortedOpTest.java	Wed Nov 21 15:06:06 2012 +0100
@@ -135,7 +135,7 @@
     private static class TestParallelSizedOp<T> extends CollectorOps.Parallel<T> {
         @Override
         public <P_IN> Node<T> evaluateParallel(ParallelPipelineHelper<P_IN, T> helper) {
-            int flags = helper.getStreamFlags();
+            int flags = helper.getFlags();
 
             assertTrue(StreamOpFlags.SIZED.isKnown(flags));
             return super.evaluateParallel(helper);
--- a/test-ng/tests/org/openjdk/tests/java/util/stream/op/UniqOpTest.java	Wed Nov 21 11:01:36 2012 +0100
+++ b/test-ng/tests/org/openjdk/tests/java/util/stream/op/UniqOpTest.java	Wed Nov 21 15:06:06 2012 +0100
@@ -103,7 +103,7 @@
     private static class TestParallelSizedOp<T> extends CollectorOps.Parallel<T> {
         @Override
         public <P_IN> Node<T> evaluateParallel(ParallelPipelineHelper<P_IN, T> helper) {
-            int flags = helper.getStreamFlags();
+            int flags = helper.getFlags();
 
             assertTrue(StreamOpFlags.SIZED.isKnown(flags));
             return super.evaluateParallel(helper);