changeset 7566:dbff2adcb22f

- Merge ForEachUntilOp and ForEachOp into a factory ForEachOps. - Merge ForEachOp and TerminalSink to reduce layers.
author psandoz
date Wed, 06 Mar 2013 11:07:03 +0100
parents f971c0bb050e
children 5c52b1a2c571
files src/share/classes/java/util/stream/DistinctOp.java src/share/classes/java/util/stream/DoublePipeline.java src/share/classes/java/util/stream/ForEachOp.java src/share/classes/java/util/stream/ForEachOps.java src/share/classes/java/util/stream/ForEachUntilOp.java src/share/classes/java/util/stream/IntPipeline.java src/share/classes/java/util/stream/LongPipeline.java src/share/classes/java/util/stream/ReferencePipeline.java
diffstat 8 files changed, 478 insertions(+), 375 deletions(-) [+]
line wrap: on
line diff
--- a/src/share/classes/java/util/stream/DistinctOp.java	Tue Mar 05 21:28:50 2013 -0800
+++ b/src/share/classes/java/util/stream/DistinctOp.java	Wed Mar 06 11:07:03 2013 +0100
@@ -126,7 +126,7 @@
             // Holder of null state since ConcurrentHashMap does not support null values
             AtomicBoolean seenNull = new AtomicBoolean(false);
             ConcurrentHashMap<T, Boolean> map = new ConcurrentHashMap<>();
-            TerminalOp<T, Void> forEachOp = ForEachOp.makeRef(t -> {
+            TerminalOp<T, Void> forEachOp = ForEachOps.makeRef(t -> {
                 if (t == null)
                     seenNull.set(true);
                 else
--- a/src/share/classes/java/util/stream/DoublePipeline.java	Tue Mar 05 21:28:50 2013 -0800
+++ b/src/share/classes/java/util/stream/DoublePipeline.java	Wed Mar 06 11:07:03 2013 +0100
@@ -258,12 +258,12 @@
 
     @Override
     public void forEach(DoubleConsumer consumer) {
-        pipeline(ForEachOp.makeDouble(consumer));
+        pipeline(ForEachOps.makeDouble(consumer));
     }
 
     @Override
     public void forEachUntilCancelled(DoubleConsumer consumer, BooleanSupplier until) {
-        pipeline(ForEachUntilOp.make(consumer, until));
+        pipeline(ForEachOps.makeDouble(consumer, until));
     }
 
     @Override
--- a/src/share/classes/java/util/stream/ForEachOp.java	Tue Mar 05 21:28:50 2013 -0800
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,191 +0,0 @@
-/*
- * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.  Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-package java.util.stream;
-
-import java.util.Spliterator;
-import java.util.concurrent.CountedCompleter;
-import java.util.function.Consumer;
-import java.util.function.DoubleConsumer;
-import java.util.function.IntConsumer;
-import java.util.function.LongConsumer;
-import java.util.Objects;
-
-/**
- * A {@code TerminalOp} that evaluates a stream pipeline and sends the output
- * into a {@code TerminalSink}.  Elements will be passed to the
- * {@code TerminalSink} in whatever thread and whatever order they become
- * available, independent of the stream's encounter order.
- * @param <T> The output type of the stream pipeline
- * @since 1.8
- */
-class ForEachOp<T> implements TerminalOp<T, Void> {
-    protected final TerminalSink<T, Void> sink;
-    protected final StreamShape shape;
-
-    /**
-     * Construct a {@code ForEachOp} that sends the stream output to the
-     * provided {@code TerminalSink}
-     *
-     * @param sink The {@code TerminalSink} to send stream output to
-     * @param shape The output shape of the stream pipeline
-     */
-    protected ForEachOp(TerminalSink<T, Void> sink, StreamShape shape) {
-        this.shape = Objects.requireNonNull(shape);
-        this.sink = Objects.requireNonNull(sink);
-    }
-
-    /** Specialization of {@code TerminalSink} with void result */
-    interface VoidTerminalSink<T> extends TerminalSink<T, Void> {
-        @Override
-        default Void get() {
-            return null;
-        }
-
-        /** Specialization of {@code TerminalSink} and {@code Sink.OfInt} with
-         * void result */
-        interface OfInt extends VoidTerminalSink<Integer>, Sink.OfInt { }
-
-        /** Specialization of {@code TerminalSink} and {@code Sink.OfLong} with
-         * void result */
-        interface OfLong extends VoidTerminalSink<Long>, Sink.OfLong { }
-
-        /** Specialization of {@code TerminalSink} and {@code Sink.OfDouble}
-         * with void result */
-        interface OfDouble extends VoidTerminalSink<Double>, Sink.OfDouble { }
-    }
-
-    /**
-     * Construct a {@code ForEachOp} that reads from a {@code Stream} and sends
-     * the stream output to the provided {@code Consumer}
-     *
-     * @param consumer The {@code Consumer} to send stream output to
-     */
-    public static <T> TerminalOp<T, Void> makeRef(final Consumer<? super T> consumer) {
-        return new ForEachOp<>((VoidTerminalSink<T>) consumer::accept, StreamShape.REFERENCE);
-    }
-
-    /**
-     * Construct a {@code ForEachOp} that reads from an {@code IntStream} and
-     * sends the stream output to the provided {@code IntConsumer}
-     *
-     * @param consumer The {@code IntConsumer} to send stream output to
-     */
-    public static TerminalOp<Integer, Void> makeInt(final IntConsumer consumer) {
-        return new ForEachOp<>((VoidTerminalSink.OfInt) consumer::accept, StreamShape.INT_VALUE);
-    }
-
-    /**
-     * Construct a {@code ForEachOp} that reads from a {@code LongStream} and
-     * sends the stream output to the provided {@code LongConsumer}
-     *
-     * @param consumer The {@code LongConsumer} to send stream output to
-     */
-    public static TerminalOp<Long, Void> makeLong(final LongConsumer consumer) {
-        return new ForEachOp<>((VoidTerminalSink.OfLong) consumer::accept, StreamShape.LONG_VALUE);
-    }
-
-    /**
-     * Construct a {@code ForEachOp} that reads from a {@code DoubleStream} and
-     * sends the stream output to the provided {@code DoubleConsumer}
-     *
-     * @param consumer The {@code DoubleConsumer} to send stream output to
-     */
-    public static TerminalOp<Double, Void> makeDouble(final DoubleConsumer consumer) {
-        return new ForEachOp<>((VoidTerminalSink.OfDouble) consumer::accept, StreamShape.DOUBLE_VALUE);
-    }
-
-    @Override
-    public int getOpFlags() {
-        return StreamOpFlag.NOT_ORDERED;
-    }
-
-    @Override
-    public StreamShape inputShape() {
-        return shape;
-    }
-
-    @Override
-    public <S> Void evaluateSequential(PipelineHelper<S, T> helper) {
-        return helper.into(sink, helper.sourceSpliterator()).get();
-    }
-
-    @Override
-    public <S> Void evaluateParallel(PipelineHelper<S, T> helper) {
-        new ForEachTask<>(helper, helper.wrapSink(sink)).invoke();
-        return null;
-    }
-
-    /** A {@code ForkJoinTask} for performing a parallel for-each operation */
-    private static class ForEachTask<S, T> extends CountedCompleter<Void> {
-        private Spliterator<S> spliterator;
-        private final Sink<S> sink;
-        private final PipelineHelper<S, T> helper;
-        private final long targetSize;
-
-        ForEachTask(PipelineHelper<S, T> helper, Sink<S> sink) {
-            super(null);
-            this.spliterator = helper.sourceSpliterator();
-            this.sink = sink;
-            this.targetSize = AbstractTask.suggestTargetSize(spliterator.estimateSize());
-            this.helper = helper;
-        }
-
-        ForEachTask(ForEachTask<S, T> parent, Spliterator<S> spliterator) {
-            super(parent);
-            this.spliterator = spliterator;
-            this.sink = parent.sink;
-            this.targetSize = parent.targetSize;
-            this.helper = parent.helper;
-        }
-
-        public void compute() {
-            doCompute(this);
-        }
-
-        private static <S, T> void doCompute(ForEachTask<S, T> task) {
-            boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(task.helper.getStreamAndOpFlags());
-            while (true) {
-                if (isShortCircuit && task.sink.cancellationRequested()) {
-                    task.propagateCompletion();
-                    task.spliterator = null;
-                    return;
-                }
-
-                Spliterator<S> split = null;
-                if (!AbstractTask.suggestSplit(task.helper, task.spliterator, task.targetSize)
-                    || (split = task.spliterator.trySplit()) == null) {
-                    task.helper.intoWrapped(task.sink, task.spliterator);
-                    task.propagateCompletion();
-                    task.spliterator = null;
-                    return;
-                }
-                else {
-                    task.addToPendingCount(1);
-                    new ForEachTask<>(task, split).fork();
-                }
-            }
-        }
-    }
-}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/share/classes/java/util/stream/ForEachOps.java	Wed Mar 06 11:07:03 2013 +0100
@@ -0,0 +1,469 @@
+/*
+ * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package java.util.stream;
+
+import java.util.Spliterator;
+import java.util.concurrent.CountedCompleter;
+import java.util.function.BooleanSupplier;
+import java.util.function.Consumer;
+import java.util.function.DoubleConsumer;
+import java.util.function.IntConsumer;
+import java.util.function.LongConsumer;
+import java.util.Objects;
+
+/**
+ * A factory for the creating instances of {@code TerminalOp) that implement
+ * {@code forEach} or {@code forEachUntil} traversal over elements of a stream.
+ *
+ * <p>{@code forEach} traverses all elements of a stream and sends those
+ * elements to a {@code Consumer}.
+ *
+ * <p>{@code forEachUntil} traverses elements of a stream and sends those
+ * elements to to a {@code Consumer} until a {@code BooleanProvider} indicates
+ * that a termination criteria has occurred and no more elements should be
+ * traversed and sent.
+ *
+ * <p>For either type of traversal elements will be sent to the {@code Consumer}
+ * in whatever thread and whatever order they become available, independent of
+ * the stream's encounter order.
+ *
+ * <p>Exceptions occurring as a result of sending an element to the
+ * {@code Consumer} will be relayed to the caller and traversal will be
+ * prematurely terminated.
+ *
+ * @apiNote
+ * The termination condition is an externally-imposed criteria, and is useful
+ * for problems like "find the best answer that can be found in ten seconds",
+ * "search until you find an answer at least as good as X" , etc.  It is not
+ * designed to provide content-based cancellation, such as "process elements
+ * until you find one which matches a given criteria."
+ *
+ * <p>There is no guarantee that additional elements will not be traversed and
+ * sent after the termination criteria has transpired.  For example, a
+ * termination criteria of {@code resultSet.size() > TARGET} does not guarantee
+ * that the result set will receive no more than {@code TARGET} elements, but
+ * instead that {@code forEachUntil} traversal will attempt to stop after
+ * {@code TARGET} elements have been placed in the {@code resultSet}.
+ *
+ * @since 1.8
+ */
+final class ForEachOps {
+
+    private ForEachOps() { }
+
+    /**
+     * Constructs a {@code TerminalOp} that implements {@code forEach}
+     * traversal, which traverses all elements of a {@code Stream} and sends
+     * those elements the provided {@code Consumer}.
+     *
+     * @param consumer The {@code Consumer} that receives all elements of a
+     *        stream
+     * @param <T> The type of the stream elements
+     * @return the {@code TerminalOp} instance
+     */
+    public static <T> TerminalOp<T, Void> makeRef(Consumer<? super T> consumer) {
+        Objects.requireNonNull(consumer);
+        return new ForEachOp.OfRef<>(consumer);
+    }
+
+    /**
+     * Constructs a {@code TerminalOp} that implements {@code forEachUntil}
+     * traversal, which traverses all elements of a {@code Stream} and sends
+     * those elements to the provided {@code Consumer} until the specified
+     * {@code BooleanProvider} indicates that a termination criteria has
+     * occurred and no more elements should be traversed and sent.
+     *
+     * @param consumer The {@code Consumer} that receives elements of a stream
+     * @param until A {@code BooleanSupplier} that indicates whether the
+     *        termination criteria has occurred.  Once it returns {@code true}
+     *        the first time, it must continue to return {@code true} for all
+     *        future invocations
+     * @param <T> The type of the stream elements
+     * @return the {@code TerminalOp} instance
+     */
+    public static <T> TerminalOp<T, Void> makeRef(Consumer<? super T> consumer, BooleanSupplier until) {
+        Objects.requireNonNull(consumer);
+        Objects.requireNonNull(until);
+        return new ForEachOp.OfRef.Until<>(consumer, until);
+    }
+
+    /**
+     * Constructs a {@code TerminalOp} that implements {@code forEach}
+     * traversal, which traverses all {@code int} elements of a
+     * {@code IntStream} and sends those elements the provided
+     * {@code IntConsumer}.
+     *
+     * @param consumer The {@code IntConsumer} that receives all elements of a
+     *        stream
+     * @return the {@code TerminalOp} instance
+     */
+    public static TerminalOp<Integer, Void> makeInt(IntConsumer consumer) {
+        Objects.requireNonNull(consumer);
+        return new ForEachOp.OfInt(consumer);
+    }
+
+    /**
+     * Constructs a {@code TerminalOp} that implements {@code forEachUntil}
+     * traversal, which traverses all {@code int} elements of a
+     * {@code IntStream} and sends those elements to the provided
+     * {@code IntConsumer} until the specified {@code BooleanProvider} indicates
+     * that a termination criteria has occurred and no more elements should be
+     * traversed and sent.
+     *
+     * @param consumer The {@code IntConsumer} that receives elements of a
+     *        stream
+     * @param until A {@code BooleanSupplier} that indicates whether the
+     *        termination criteria has occurred.  Once it returns {@code true}
+     *        the first time, it must continue to return {@code true} for all
+     *        future invocations
+     * @return the {@code TerminalOp} instance
+     */
+    public static TerminalOp<Integer, Void> makeInt(IntConsumer consumer, BooleanSupplier until) {
+        Objects.requireNonNull(consumer);
+        Objects.requireNonNull(until);
+        return new ForEachOp.OfInt.Until(consumer, until);
+    }
+
+    /**
+     * Constructs a {@code TerminalOp} that implements {@code forEach}
+     * traversal, which traverses all {@code long} elements of a
+     * {@code LongStream} and sends those elements the provided
+     * {@code LongConsumer}.
+     *
+     * @param consumer The {@code LongConsumer} that receives all elements of a
+     *        stream
+     * @return the {@code TerminalOp} instance
+     */
+    public static TerminalOp<Long, Void> makeLong(LongConsumer consumer) {
+        Objects.requireNonNull(consumer);
+        return new ForEachOp.OfLong(consumer);
+    }
+
+    /**
+     * Constructs a {@code TerminalOp} that implements {@code forEachUntil}
+     * traversal, which traverses all {@code long} elements of a
+     * {@code LongStream} and sends those elements to the provided
+     * {@code LongConsumer} until the specified {@code BooleanProvider}
+     * indicates that a termination criteria has occurred and no more elements
+     * should be traversed and sent.
+     *
+     * @param consumer The {@code LongConsumer} that receives elements of a
+     *        stream
+     * @param until A {@code BooleanSupplier} that indicates whether the
+     *        termination criteria has occurred.  Once it returns {@code true}
+     *        the first time, it must continue to return {@code true} for all
+     *        future invocations
+     * @return the {@code TerminalOp} instance
+     */
+    public static TerminalOp<Long, Void> makeLong(LongConsumer consumer, BooleanSupplier until) {
+        Objects.requireNonNull(consumer);
+        Objects.requireNonNull(until);
+        return new ForEachOp.OfLong.Until(consumer, until);
+    }
+
+    /**
+     * Constructs a {@code TerminalOp} that implements {@code forEach}
+     * traversal, which traverses all {@code double} elements of a
+     * {@code DoubleStream} and sends those elements the provided
+     * {@code DoubleConsumer}.
+     *
+     * @param consumer The {@code DoubleConsumer} that receives all elements of
+     *        a stream
+     * @return the {@code TerminalOp} instance
+     */
+    public static TerminalOp<Double, Void> makeDouble(DoubleConsumer consumer) {
+        Objects.requireNonNull(consumer);
+        return new ForEachOp.OfDouble(consumer);
+    }
+
+    /**
+     * Constructs a {@code TerminalOp} that implements {@code forEachUntil}
+     * traversal, which traverses all {@code double} elements of a
+     * {@code DoubleStream} and sends those elements to the provided
+     * {@code DoubleConsumer} until the specified {@code BooleanProvider}
+     * indicates that a termination criteria has occurred and no more elements
+     * should be traversed and sent.
+     *
+     * @param consumer The {@code DoubleConsumer} that receives elements of a
+     *        stream
+     * @param until A {@code BooleanSupplier} that indicates whether the
+     *        termination criteria has occurred.  Once it returns {@code true}
+     *        the first time, it must continue to return {@code true} for all
+     *        future invocations
+     * @return the {@code TerminalOp} instance
+     */
+    public static TerminalOp<Double, Void> makeDouble(DoubleConsumer consumer, BooleanSupplier until) {
+        Objects.requireNonNull(consumer);
+        Objects.requireNonNull(until);
+        return new ForEachOp.OfDouble.Until(consumer, until);
+    }
+
+    /**
+     * A {@code TerminalOp} that evaluates a stream pipeline and sends the
+     * output to itself as a {@code TerminalSink}.  Elements will be sent in
+     * whatever thread and whatever order they become available, independent of
+     * the stream's encounter order.
+     *
+     * <p>This terminal operation is stateless.  For parallel evaluation each
+     * leaf instance of a {@code ForEachTask} will send elements to the same
+     * {@code TerminalSink} reference that is an instance of this class.  State
+     * management, if any, is deferred to the consumer, held by the concrete
+     * sub-classes, that is the final receiver elements.
+     *
+     * @param <T> The output type of the stream pipeline
+     */
+    private static abstract class ForEachOp<T> implements TerminalOp<T, Void>, TerminalSink<T, Void> {
+
+        // TerminalOp
+
+        @Override
+        public int getOpFlags() {
+            return StreamOpFlag.NOT_ORDERED;
+        }
+
+        @Override
+        public <S> Void evaluateSequential(PipelineHelper<S, T> helper) {
+            return helper.into(this, helper.sourceSpliterator()).get();
+        }
+
+        @Override
+        public <S> Void evaluateParallel(PipelineHelper<S, T> helper) {
+            new ForEachTask<>(helper, helper.wrapSink(this)).invoke();
+            return null;
+        }
+
+        // TerminalSink
+
+        @Override
+        public Void get() {
+            return null;
+        }
+
+        // Implementations
+
+        /** {@code forEach} with {@code Stream} */
+        private static class OfRef<T> extends ForEachOp<T> {
+            final Consumer<? super T> consumer;
+
+            OfRef(Consumer<? super T> consumer) {
+                this.consumer = consumer;
+            }
+
+            @Override
+            public void accept(T t) {
+                consumer.accept(t);
+            }
+
+            /** {@code forEachUntil} with {@code Stream} */
+            static final class Until<T> extends ForEachOp.OfRef<T> {
+                final BooleanSupplier until;
+
+                Until(Consumer<? super T> consumer, BooleanSupplier until) {
+                    super(consumer);
+                    this.until = until;
+                }
+
+                @Override
+                public int getOpFlags() {
+                    return StreamOpFlag.NOT_ORDERED | StreamOpFlag.IS_SHORT_CIRCUIT;
+                }
+
+                @Override
+                public boolean cancellationRequested() {
+                    return until.getAsBoolean();
+                }
+            }
+        }
+
+        /** {@code forEach} with {@code IntStream} */
+        private static class OfInt extends ForEachOp<Integer> implements Sink.OfInt {
+            final IntConsumer consumer;
+
+            OfInt(IntConsumer consumer) {
+                this.consumer = consumer;
+            }
+
+            @Override
+            public StreamShape inputShape() {
+                return StreamShape.INT_VALUE;
+            }
+
+            @Override
+            public void accept(int t) {
+                consumer.accept(t);
+            }
+
+            /** {@code forEachUntil} with {@code IntStream} */
+            static final class Until extends ForEachOp.OfInt {
+                final BooleanSupplier until;
+
+                Until(IntConsumer consumer, BooleanSupplier until) {
+                    super(consumer);
+                    this.until = until;
+                }
+
+                @Override
+                public int getOpFlags() {
+                    return StreamOpFlag.NOT_ORDERED | StreamOpFlag.IS_SHORT_CIRCUIT;
+                }
+
+                @Override
+                public boolean cancellationRequested() {
+                    return until.getAsBoolean();
+                }
+            }
+        }
+
+        /** {@code forEach} with {@code LongStream} */
+        private static class OfLong extends ForEachOp<Long> implements Sink.OfLong {
+            final LongConsumer consumer;
+
+            OfLong(LongConsumer consumer) {
+                this.consumer = consumer;
+            }
+
+            @Override
+            public StreamShape inputShape() {
+                return StreamShape.LONG_VALUE;
+            }
+
+            @Override
+            public void accept(long t) {
+                consumer.accept(t);
+            }
+
+            /** {@code forEachUntil} with {@code LongStream} */
+            private static final class Until extends ForEachOp.OfLong {
+                final BooleanSupplier until;
+
+                Until(LongConsumer consumer, BooleanSupplier until) {
+                    super(consumer);
+                    this.until = until;
+                }
+
+                @Override
+                public int getOpFlags() {
+                    return StreamOpFlag.NOT_ORDERED | StreamOpFlag.IS_SHORT_CIRCUIT;
+                }
+
+                @Override
+                public boolean cancellationRequested() {
+                    return until.getAsBoolean();
+                }
+            }
+        }
+
+        /** {@code forEach} with {@code DoubleStream} */
+        private static class OfDouble extends ForEachOp<Double> implements Sink.OfDouble {
+            final DoubleConsumer consumer;
+
+            OfDouble(DoubleConsumer consumer) {
+                this.consumer = consumer;
+            }
+
+            @Override
+            public StreamShape inputShape() {
+                return StreamShape.DOUBLE_VALUE;
+            }
+
+            @Override
+            public void accept(double t) {
+                consumer.accept(t);
+            }
+
+            /** {@code forEachUntil} with {@code DoubleStream} */
+            private static final class Until extends ForEachOp.OfDouble {
+                final BooleanSupplier until;
+
+                Until(DoubleConsumer consumer, BooleanSupplier until) {
+                    super(consumer);
+                    this.until = until;
+                }
+
+                @Override
+                public int getOpFlags() {
+                    return StreamOpFlag.NOT_ORDERED | StreamOpFlag.IS_SHORT_CIRCUIT;
+                }
+
+                @Override
+                public boolean cancellationRequested() {
+                    return until.getAsBoolean();
+                }
+            }
+        }
+    }
+
+    /** A {@code ForkJoinTask} for performing a parallel for-each operation */
+    private static class ForEachTask<S, T> extends CountedCompleter<Void> {
+        private Spliterator<S> spliterator;
+        private final Sink<S> sink;
+        private final PipelineHelper<S, T> helper;
+        private final long targetSize;
+
+        ForEachTask(PipelineHelper<S, T> helper, Sink<S> sink) {
+            super(null);
+            this.spliterator = helper.sourceSpliterator();
+            this.sink = sink;
+            this.targetSize = AbstractTask.suggestTargetSize(spliterator.estimateSize());
+            this.helper = helper;
+        }
+
+        ForEachTask(ForEachTask<S, T> parent, Spliterator<S> spliterator) {
+            super(parent);
+            this.spliterator = spliterator;
+            this.sink = parent.sink;
+            this.targetSize = parent.targetSize;
+            this.helper = parent.helper;
+        }
+
+        public void compute() {
+            doCompute(this);
+        }
+
+        private static <S, T> void doCompute(ForEachTask<S, T> task) {
+            boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(task.helper.getStreamAndOpFlags());
+            while (true) {
+                if (isShortCircuit && task.sink.cancellationRequested()) {
+                    task.propagateCompletion();
+                    task.spliterator = null;
+                    return;
+                }
+
+                Spliterator<S> split = null;
+                if (!AbstractTask.suggestSplit(task.helper, task.spliterator, task.targetSize)
+                    || (split = task.spliterator.trySplit()) == null) {
+                    task.helper.intoWrapped(task.sink, task.spliterator);
+                    task.propagateCompletion();
+                    task.spliterator = null;
+                    return;
+                }
+                else {
+                    task.addToPendingCount(1);
+                    new ForEachTask<>(task, split).fork();
+                }
+            }
+        }
+    }
+}
--- a/src/share/classes/java/util/stream/ForEachUntilOp.java	Tue Mar 05 21:28:50 2013 -0800
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,175 +0,0 @@
-/*
- * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.  Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-package java.util.stream;
-
-import java.util.Objects;
-import java.util.function.BooleanSupplier;
-import java.util.function.Consumer;
-import java.util.function.DoubleConsumer;
-import java.util.function.IntConsumer;
-import java.util.function.LongConsumer;
-
-/**
- * A {@code TerminalOp} that evaluates a stream pipeline and sends the output
- * into a {@code TerminalSink}, attempting to stop early if some external
- * termination condition is reached.
- *
- * @apiNote
- * The termination condition is an externally-imposed criteria, and is useful
- * for problems like "find the best answer that can be found in ten seconds",
- * "search until you find an answer at least as good as X", etc.  It is not
- * designed to provide content-based cancellation, such as "process elements
- * until you find one which matches a given criteria."
- *
- * <p>There is no guarantee that additional elements will not be processed after
- * the termination criteria has transpired.  For example, a termination criteria
- * of {@code resultSet.size() > TARGET} does not guarantee that the result set
- * will receive no more than {@code TARGET} elements, but instead that
- * @{code ForEachUntilOp} will attempt to stop after {@code TARGET} elements
- * have been placed in the result set.
- *
- * @param <T> The output type of the stream pipeline
- * @since 1.8
- */
-final class ForEachUntilOp<T> extends ForEachOp<T> implements TerminalOp<T, Void> {
-
-    private ForEachUntilOp(TerminalSink<T, Void> sink, StreamShape shape) {
-        super(sink, shape);
-    }
-
-    /**
-     * Construct a {@code ForEachUntilOp} that reads from a {@code Stream} and
-     * sends the stream output to the provided {@code Consumer}, until the
-     * specified {@code BooleanProvider} indicates that no more input should be
-     * sent
-     *
-     * @param consumer The {@code Consumer} to send stream output to
-     * @param until A {@code BooleanSupplier} that indicates whether the
-     *        termination criteria has occurred.  Once it returns {@code true}
-     *        the first time, it must continue to return {@code true} for all
-     *        future invocations
-     * @param <T> The type of the stream elements
-     */
-    public static <T> ForEachUntilOp<T> make(final Consumer<? super T> consumer, BooleanSupplier until) {
-        Objects.requireNonNull(consumer);
-        return new ForEachUntilOp<>(new VoidTerminalSink<T>() {
-            @Override
-            public void accept(T t) {
-                consumer.accept(t);
-            }
-
-            @Override
-            public boolean cancellationRequested() {
-                return until.getAsBoolean();
-            }
-        }, StreamShape.REFERENCE);
-    }
-
-    /**
-     * Construct a {@code ForEachUntilOp} that reads from an {@code IntStream}
-     * and sends the stream output to the provided {@code Consumer}, until the
-     * specified {@code BooleanProvider} indicates that no more input should be
-     * sent
-     *
-     * @param consumer The {@code Consumer} to send stream output to
-     * @param until A {@code BooleanSupplier} that indicates whether the
-     *        termination criteria has occurred.  Once it returns {@code true}
-     *        the first time, it must continue to return {@code true} for all
-     *        future invocations
-     */
-    public static ForEachUntilOp<Integer> make(final IntConsumer consumer, BooleanSupplier until) {
-        Objects.requireNonNull(consumer);
-        return new ForEachUntilOp<>(new VoidTerminalSink.OfInt() {
-            @Override
-            public void accept(int i) {
-                consumer.accept(i);
-            }
-
-            @Override
-            public boolean cancellationRequested() {
-                return until.getAsBoolean();
-            }
-        }, StreamShape.INT_VALUE);
-    }
-
-    /**
-     * Construct a {@code ForEachUntilOp} that reads from a {@code LongStream}
-     * and sends the stream output to the provided {@code Consumer}, until the
-     * specified {@code BooleanProvider} indicates that no more input should be
-     * sent
-     *
-     * @param consumer The {@code Consumer} to send stream output to
-     * @param until A {@code BooleanSupplier} that indicates whether the
-     *        termination criteria has occurred.  Once it returns {@code true}
-     *        the first time, it must continue to return {@code true} for all
-     *        future invocations
-     */
-    public static ForEachUntilOp<Long> make(final LongConsumer consumer, BooleanSupplier until) {
-        Objects.requireNonNull(consumer);
-        return new ForEachUntilOp<>(new VoidTerminalSink.OfLong() {
-            @Override
-            public void accept(long i) {
-                consumer.accept(i);
-            }
-
-            @Override
-            public boolean cancellationRequested() {
-                return until.getAsBoolean();
-            }
-        }, StreamShape.LONG_VALUE);
-    }
-
-    /**
-     * Construct a {@code ForEachUntilOp} that reads from a {@code DoubleStream}
-     * and sends the stream output to the provided {@code Consumer}, until the
-     * specified {@code BooleanProvider} indicates that no more input should be
-     * sent
-     *
-     * @param consumer The {@code Consumer} to send stream output to
-     * @param until A {@code BooleanSupplier} that indicates whether the
-     *        termination criteria has occurred.  Once it returns {@code true}
-     *        the first time, it must continue to return {@code true} for all
-     *        future invocations
-     */
-    public static ForEachUntilOp<Double> make(final DoubleConsumer consumer, BooleanSupplier until) {
-        Objects.requireNonNull(consumer);
-        return new ForEachUntilOp<>(new VoidTerminalSink.OfDouble() {
-            @Override
-            public void accept(double i) {
-                consumer.accept(i);
-            }
-
-            @Override
-            public boolean cancellationRequested() {
-                return until.getAsBoolean();
-            }
-        }, StreamShape.DOUBLE_VALUE);
-    }
-
-    @Override
-    public int getOpFlags() {
-        return StreamOpFlag.IS_SHORT_CIRCUIT | StreamOpFlag.NOT_ORDERED;
-    }
-}
--- a/src/share/classes/java/util/stream/IntPipeline.java	Tue Mar 05 21:28:50 2013 -0800
+++ b/src/share/classes/java/util/stream/IntPipeline.java	Wed Mar 06 11:07:03 2013 +0100
@@ -280,12 +280,12 @@
 
     @Override
     public void forEach(IntConsumer consumer) {
-        pipeline(ForEachOp.makeInt(consumer));
+        pipeline(ForEachOps.makeInt(consumer));
     }
 
     @Override
     public void forEachUntilCancelled(IntConsumer consumer, BooleanSupplier until) {
-        pipeline(ForEachUntilOp.make(consumer, until));
+        pipeline(ForEachOps.makeInt(consumer, until));
     }
 
     @Override
--- a/src/share/classes/java/util/stream/LongPipeline.java	Tue Mar 05 21:28:50 2013 -0800
+++ b/src/share/classes/java/util/stream/LongPipeline.java	Wed Mar 06 11:07:03 2013 +0100
@@ -270,12 +270,12 @@
 
     @Override
     public void forEach(LongConsumer consumer) {
-        pipeline(ForEachOp.makeLong(consumer));
+        pipeline(ForEachOps.makeLong(consumer));
     }
 
     @Override
     public void forEachUntilCancelled(LongConsumer consumer, BooleanSupplier until) {
-        pipeline(ForEachUntilOp.make(consumer, until));
+        pipeline(ForEachOps.makeLong(consumer, until));
     }
 
     @Override
--- a/src/share/classes/java/util/stream/ReferencePipeline.java	Tue Mar 05 21:28:50 2013 -0800
+++ b/src/share/classes/java/util/stream/ReferencePipeline.java	Wed Mar 06 11:07:03 2013 +0100
@@ -298,12 +298,12 @@
 
     @Override
     public void forEach(Consumer<? super U> consumer) {
-        pipeline(ForEachOp.makeRef(consumer));
+        pipeline(ForEachOps.makeRef(consumer));
     }
 
     @Override
     public void forEachUntilCancelled(Consumer<? super U> consumer, BooleanSupplier cancelledFunction) {
-        pipeline(ForEachUntilOp.make(consumer, cancelledFunction));
+        pipeline(ForEachOps.makeRef(consumer, cancelledFunction));
     }
 
     @Override