OpenJDK / lambda / lambda / jdk
changeset 5863:682e571b428a it2-bootstrap
Major refactor of Stream and Pipeline hierarchies: remove Stream methods from Streamable; merge Stream and ParallelStream; base all Pipeline classes off of common AbstractPipeline
line wrap: on
line diff
--- a/make/java/java/FILES_java.gmk Mon Aug 20 16:10:37 2012 -0400 +++ b/make/java/java/FILES_java.gmk Sat Aug 25 19:03:55 2012 -0400 @@ -400,7 +400,6 @@ java/util/streams/Streams.java \ java/util/streams/MapStream.java \ java/util/streams/ParallelPipeline.java \ - java/util/streams/SizedStreamable.java \ java/util/streams/ParallelStreamable.java \ java/util/streams/ops/MapFilterKeysOp.java \ java/util/streams/ops/ToArrayOp.java \
--- a/src/share/classes/java/lang/CharSequence.java Mon Aug 20 16:10:37 2012 -0400 +++ b/src/share/classes/java/lang/CharSequence.java Sat Aug 25 19:03:55 2012 -0400 @@ -27,8 +27,10 @@ import java.util.Iterator; import java.util.NoSuchElementException; +import java.util.Traversable; import java.util.functions.Sink; -import java.util.streams.Streamable; +import java.util.streams.Stream; +import java.util.streams.Streams; /** * A <tt>CharSequence</tt> is a readable sequence of <code>char</code> values. This @@ -118,8 +120,8 @@ * * @return an Iterable of Character values from this sequence */ - public Streamable<Character> asChars() default { - return new CharacterIterable(this); + public Stream<Character> asChars() default { + return Streams.stream(new CharacterTraversable(this), length()); } /** @@ -137,15 +139,15 @@ * * @return an Iterable of Unicode code points from this sequence */ - public Streamable<Integer> asCodePoints() default { - return new CodePointIterable(this); + public Stream<Integer> asCodePoints() default { + return Streams.stream(new CodePointTraversable(this)); } } -class CharacterIterable implements Streamable<Character> { +class CharacterTraversable implements Traversable<Character> { final CharSequence cs; - CharacterIterable(CharSequence cs) { + CharacterTraversable(CharSequence cs) { this.cs = cs; } @@ -173,10 +175,10 @@ } } -class CodePointIterable implements Streamable<Integer> { +class CodePointTraversable implements Traversable<Integer> { final CharSequence cs; - CodePointIterable(CharSequence cs) { + CodePointTraversable(CharSequence cs) { this.cs = cs; }
--- a/src/share/classes/java/lang/String.java Mon Aug 20 16:10:37 2012 -0400 +++ b/src/share/classes/java/lang/String.java Sat Aug 25 19:03:55 2012 -0400 @@ -37,7 +37,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.regex.PatternSyntaxException; -import java.util.streams.Streamable; +import java.util.streams.Stream; /** * The {@code String} class represents character strings. All @@ -2356,11 +2356,11 @@ * the results are returned as a stream of Strings instead of * as an array. */ - public Streamable<String> splitAsStream(String regex, int limit) { + public Stream<String> splitAsStream(String regex, int limit) { // TODO: it would be better to match and return results // incrementally as they are demanded instead of precomputing // the entire array. - return Arrays.asList(split(regex, limit)); + return Arrays.asList(split(regex, limit)).stream(); } /** @@ -2383,7 +2383,7 @@ * // out is [a, b, c, ghi, jkl, mno] * </pre> */ - public Streamable<String> splitAsStream(String regex) { + public Stream<String> splitAsStream(String regex) { return splitAsStream(regex, 0); }
--- a/src/share/classes/java/util/Arrays.java Mon Aug 20 16:10:37 2012 -0400 +++ b/src/share/classes/java/util/Arrays.java Sat Aug 25 19:03:55 2012 -0400 @@ -27,7 +27,7 @@ import java.lang.reflect.*; import java.util.functions.*; -import java.util.streams.ParallelStreamable; +import java.util.streams.*; /** * This class contains various methods for manipulating arrays (such as @@ -3727,7 +3727,7 @@ } ArraySpliterator(T[] array, int offset, int length) { - this.array = array; + this.array = Objects.requireNonNull(array); this.curIndex = offset; this.endIndex = offset+length; } @@ -3772,34 +3772,53 @@ } } - private static class ArraySplittable<T> implements ParallelStreamable<T> { - private final T[] array; - private final int offset; - private final int length; - - ArraySplittable(T[] array) { + private static class ArrayParallelStreamAccessor<T> + extends ArraySpliterator<T> + implements StreamAccessor<T>, Iterator<T>, Spliterator<T> { + private final int size; + + ArrayParallelStreamAccessor(T[] array) { this(array, 0, array.length); } - ArraySplittable(T[] array, int offset, int length) { - this.array = Objects.requireNonNull(array); - this.offset = offset; - this.length = length; + ArrayParallelStreamAccessor(T[] array, int offset, int length) { + super(array, offset, length); + this.size = length; + } + + @Override + public int getStreamFlags() { + return Stream.STATE_SIZED; + } + + @Override + public int getSizeOrEstimate() { + return size; + } + + @Override + public boolean isParallel() { + return true; + } + + @Override + public Stream.Shape getShape() { + return Stream.Shape.LINEAR; + } + + @Override + public Iterator<T> iterator() { + return this; } @Override public Spliterator<T> spliterator() { - return new ArraySpliterator<>(array, offset, length); + return this; } @Override - public Iterator<T> iterator() { - return new ArrayIterator<>(array, offset, length); - } - - @Override - public int size() { - return length; + public boolean isPredictableSplits() { + return true; } } @@ -3830,31 +3849,47 @@ public static<T> Iterator<T> iterator(T[] array) { - return new ArrayIterator<>(array, 0, array.length); + return iterator(array, 0, array.length); } public static<T> Iterator<T> iterator(T[] array, int offset, int length) { return new ArrayIterator<>(array, offset, length); } + public static<T> Spliterator<T> spliterator(T[] array) { + return spliterator(array, 0, array.length); + } + + public static<T> Spliterator<T> spliterator(T[] array, int offset, int length) { + return new ArraySpliterator<>(array, offset, length); + } + public static<T> Iterable<T> iterable(T[] array) { - return () -> new ArrayIterator<>(array, 0, array.length); + return iterable(array, 0, array.length); } public static<T> Iterable<T> iterable(T[] array, int offset, int length) { return () -> new ArrayIterator<>(array, offset, length); } - public static<T> ParallelStreamable<T> parallel(T[] array) { - return new ArraySplittable<>(array); + public static<T> Stream<T> stream(T[] array) { + return stream(array, 0, array.length); } - public static<T> ParallelStreamable<T> parallel(T[] array, int offset, int length) { - return new ArraySplittable<>(array, offset, length); + public static<T> Stream<T> stream(T[] array, int offset, int length) { + return Streams.stream(new ArrayIterator<>(array, offset, length), length); } + public static<T> Stream<T> parallel(T[] array) { + return parallel(array, 0, array.length); + } + + public static<T> Stream<T> parallel(T[] array, int offset, int length) { + return Streams.parallel(new ArrayParallelStreamAccessor<>(array, offset, length)); + } + public static<T> Sink<T> sink(T[] array) { - return new ArraySink<>(array); + return sink(array, 0, array.length); } public static<T> Sink<T> sink(T[] array, int offset, int length) {
--- a/src/share/classes/java/util/Collection.java Mon Aug 20 16:10:37 2012 -0400 +++ b/src/share/classes/java/util/Collection.java Sat Aug 25 19:03:55 2012 -0400 @@ -26,7 +26,10 @@ package java.util; import java.util.functions.Predicate; -import java.util.streams.SizedStreamable; +import java.util.functions.Sink; +import java.util.streams.Stream; +import java.util.streams.Streamable; +import java.util.streams.Streams; /** * The root interface in the <i>collection hierarchy</i>. A collection @@ -127,7 +130,7 @@ * @since 1.2 */ -public interface Collection<E> extends SizedStreamable<E>, Iterable<E>, Fillable<E> { +public interface Collection<E> extends Sized, Streamable<E>, Traversable<E>, Iterable<E>, Fillable<E> { // Query Operations /** @@ -457,19 +460,6 @@ int hashCode(); /** - * Retains all of the elements of this collection which match the provided - * predicate. - * - * @param filter a predicate which returns {@code true} for elements to be - * retained. - * @return {@code true} if any elements were retained. - */ - // XXX potential source incompatibility with retainAll(null) now being ambiguous - boolean retainAll(Predicate<? super E> filter) default { - return CollectionHelpers.retainAll(this, filter); - } - - /** * Removes all of the elements of this collection which match the provided * predicate. * @@ -477,12 +467,37 @@ * removed. * @return {@code true} if any elements were removed. */ - // XXX potential source incompatibility with removeAll(null) now being ambiguous - boolean removeAll(Predicate<? super E> filter) default { - return CollectionHelpers.removeAll(this, filter); + boolean removeIf(Predicate<? super E> filter) default { + boolean removed = false; + Iterator<E> each = iterator(); + while(each.hasNext()) { + if(filter.test(each.next())) { + each.remove(); + removed = true; + } + } + + return removed; } void addAll(Iterable<? extends E> source) default { - CollectionHelpers.addAll(this, source); + if (source instanceof Collection) + addAll((Collection<? extends E>) source); + else { + for (E e : source) + add(e); + } + } + + @Override + Stream<E> stream() default { + return Streams.stream(this, size()); + } + + @Override + void forEach(Sink<? super E> sink) default { + for (E e: this) { + sink.accept(e); + } } }
--- a/src/share/classes/java/util/CollectionHelpers.java Mon Aug 20 16:10:37 2012 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,87 +0,0 @@ -/* - * Copyright (c) 1997, 2010, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. Oracle designates this - * particular file as subject to the "Classpath" exception as provided - * by Oracle in the LICENSE file that accompanied this code. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA - * or visit www.oracle.com if you need additional information or have any - * questions. - */ -package java.util; - -import java.util.functions.Predicate; - -/** - * Extension method implementations for {@code Collection}. - */ -public final class CollectionHelpers { - /** - * Retains only elements of the provided collection that return {@code true} - * for the provided predicate. - * - * @param <E> type of collection elements. - * @param collection the collection containing the elements to be tested. - * @param filter predicate against which elements will be tested. - * @return The same collection instance as provided. - */ - public static <E> boolean retainAll(Collection<E> collection, Predicate<? super E> filter ) { - boolean retained = false; - Iterator<E> each = collection.iterator(); - while(each.hasNext()) { - if(!filter.test(each.next())) { - each.remove(); - } else { - retained = true; - } - - } - - return retained; - } - - /** - * Remove elements of the provided collection that return {@code true} - * for the provided predicate. - * - * @param <E> type of collection elements. - * @param collection the collection containing the elements to be tested. - * @param filter predicate against which elements will be tested. - * @return The same collection instance as provided. - */ - public static <E> boolean removeAll(Collection<E> collection, Predicate<? super E> filter ) { - boolean removed = false; - Iterator<E> each = collection.iterator(); - while(each.hasNext()) { - if(filter.test(each.next())) { - each.remove(); - removed = true; - } - } - - return removed; - } - - public static<E> void addAll(Collection<E> collection, Iterable<? extends E> iterable) { - if (iterable instanceof Collection) - collection.addAll((Collection<? extends E>) iterable); - else { - for (E e : iterable) - collection.add(e); - } - } -}
--- a/src/share/classes/java/util/MapIterator.java Mon Aug 20 16:10:37 2012 -0400 +++ b/src/share/classes/java/util/MapIterator.java Sat Aug 25 19:03:55 2012 -0400 @@ -69,11 +69,11 @@ */ public static class IteratorAdapter<K, V> implements MapIterator<K, V> { - protected final Iterator<? super Mapping<K,V>> source; + protected final Iterator<? extends Mapping<K,V>> source; protected Mapping<K, V> current; - public IteratorAdapter(Iterator<? super Mapping<K, V>> source) { + public IteratorAdapter(Iterator<? extends Mapping<K, V>> source) { this.source = Objects.requireNonNull(source); } @@ -112,7 +112,8 @@ @Override public Mapping<K, V> next() { - return (current = (Mapping<K,V>) source.next()); + current = source.next(); + return current; } } }
--- a/src/share/classes/java/util/StringJoiner.java Mon Aug 20 16:10:37 2012 -0400 +++ b/src/share/classes/java/util/StringJoiner.java Sat Aug 25 19:03:55 2012 -0400 @@ -24,7 +24,7 @@ */ package java.util; -import java.util.streams.Streamable; +import java.util.streams.Stream; /** * StringJoiner is used to construct a sequence of characters separated @@ -334,7 +334,7 @@ * @return the current value of StringJoiner */ @Override - public Streamable<Character> asChars() { + public Stream<Character> asChars() { return (somethingAdded ? (value.toString() + suffix).asChars() : emptyOutput.asChars() ); } @@ -352,7 +352,7 @@ * @return the current value of StringJoiner */ @Override - public Streamable<Integer> asCodePoints() { + public Stream<Integer> asCodePoints() { return (somethingAdded ? (value.toString() + suffix).asCodePoints() : emptyOutput.asCodePoints() ); }
--- a/src/share/classes/java/util/Traversable.java Mon Aug 20 16:10:37 2012 -0400 +++ b/src/share/classes/java/util/Traversable.java Sat Aug 25 19:03:55 2012 -0400 @@ -40,5 +40,8 @@ * * @param sink The Sink to which elements will be provided. */ - public void forEach(Sink<? super T> sink); + public void forEach(Sink<? super T> sink) default { + for (T t : this) + sink.accept(t); + } }
--- a/src/share/classes/java/util/functions/BiSink.java Mon Aug 20 16:10:37 2012 -0400 +++ b/src/share/classes/java/util/functions/BiSink.java Sat Aug 25 19:03:55 2012 -0400 @@ -25,6 +25,8 @@ package java.util.functions; import java.util.Mapping; +import java.util.streams.StatefulBiSink; +import java.util.streams.StatefulSink; /** * BiSink @@ -37,4 +39,32 @@ void accept(Mapping<T,U> mapping) default { accept(mapping.getKey(), mapping.getValue()); } + + BiSink<T,U> tee(BiSink<? super T, ? super U> other) default { + return (T t, U u) -> { other.accept(t, u); accept(t, u); }; + } + + @Override + StatefulBiSink<T,U,Void> asStatefulSink() default { + if (this instanceof StatefulSink) + return (StatefulBiSink<T, U, Void>) this; + else { + // @@@ Compiler bug workaround - have to capture a fake outer 'this' + final BiSink<T,U> thiz = this; + return new StatefulBiSink<T, U, Void>() { + @Override + public void begin(int size) { } + + @Override + public Void end() { + return null; + } + + @Override + public void accept(T t, U u) { + thiz.accept(t, u); + } + }; + } + } }
--- a/src/share/classes/java/util/functions/Sink.java Mon Aug 20 16:10:37 2012 -0400 +++ b/src/share/classes/java/util/functions/Sink.java Sat Aug 25 19:03:55 2012 -0400 @@ -24,6 +24,8 @@ */ package java.util.functions; +import java.util.streams.StatefulSink; + /** * A receiver of elements. The counterpart to {@code Stream}. * @@ -42,4 +44,27 @@ public Sink<T> tee(Sink<? super T> other) default { return (T t) -> { other.accept(t); accept(t); }; } + + public StatefulSink<T, Void> asStatefulSink() default { + if (this instanceof StatefulSink) + return (StatefulSink<T, Void>) this; + else { + // @@@ Compiler bug workaround - have to capture a fake outer 'this' + final Sink<T> thiz = this; + return new StatefulSink<T, Void>() { + @Override + public void begin(int size) { } + + @Override + public Void end() { + return null; + } + + @Override + public void accept(T t) { + thiz.accept(t); + } + }; + } + } }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/share/classes/java/util/streams/AbstractPipeline.java Sat Aug 25 19:03:55 2012 -0400 @@ -0,0 +1,218 @@ +/* + * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ +package java.util.streams; + +import java.util.*; +import java.util.concurrent.ForkJoinTask; +import java.util.concurrent.ForkJoinUtils; +import java.util.functions.Sink; +import java.util.streams.ops.*; + +/** + * AbstractPipeline + * + * @author Brian Goetz + */ +public abstract class AbstractPipeline<T, U> { + protected final AbstractPipeline<?, T> upstream; + protected final StreamAccessor<?> source; + protected final IntermediateOp<T,U> op; + protected final int depth; + + private Iterator<U> iterator; + + protected AbstractPipeline(StreamAccessor<?> source) { + this.source = Objects.requireNonNull(source); + this.op = null; + this.upstream = null; + this.depth = 0; + } + + protected AbstractPipeline(AbstractPipeline<?, T> upstream, IntermediateOp<T,U> op) { + this.upstream = Objects.requireNonNull(upstream); + this.op = Objects.requireNonNull(op); + this.source = upstream.source; + this.depth = upstream.depth + 1; + assert upstream.getOutputShape() == op.inputShape(); + assert (upstream.depth == 0) ^ (upstream.op != null); + } + + protected<V> V evaluate(TerminalOp<U, V> terminal) { + return source.isParallel() ? evaluateParallel(terminal) : evaluateSerial(terminal); + } + + protected<V> V evaluateParallel(ParallelOp<U, V> terminal) { + IntermediateOp[] ops = new IntermediateOp[depth]; + boolean stateful = false; + boolean shortCircuit = false; + AbstractPipeline p = this; + for (int i=depth-1; i >= 0; i--) { + stateful |= p.op.isStateful(); + shortCircuit |= p.op.isShortCircuit(); + ops[i] = p.op; + p = p.upstream; + } + + if (!stateful) + return evaluateParallel(source, terminal, ops); + else { + int i=0; + int segmentStart = 0; + StreamAccessor accessor = source; + + while (true) { + while (i < ops.length && !ops[i].isStateful()) + i++; + + IntermediateOp[] slice = Arrays.copyOfRange(ops, segmentStart, i); + if (i < ops.length) { + StatefulOp op = (StatefulOp) ops[i++]; + segmentStart = i; + TreeUtils.Node intermediateResult = (TreeUtils.Node) evaluateParallel(accessor, op, slice); + accessor = new Streams.SpliteratorStreamAccessor(intermediateResult.spliterator(), intermediateResult.size()); + } + else { + return evaluateParallel(accessor, terminal, slice); + } + } + } + } + + @SuppressWarnings({ "unchecked", "raw" }) + protected<V> V evaluateParallel(StreamAccessor source, ParallelOp<U, V> terminal, IntermediateOp[] ops) { + // @@@ Bake flags and short-circuit constraints into helper + return (V) terminal.computeParallel(makeParallelHelper(source, ops)); + } + + protected<V> V evaluateSerial(TerminalOp<U, V> terminal) { + + IntermediateOp[] ops = new IntermediateOp[depth]; + boolean intermediateShortCircuit = false; + AbstractPipeline p = this; + for (int i=depth-1; i >= 0; i--) { + intermediateShortCircuit |= p.op.isShortCircuit(); + ops[i] = p.op; + p = p.upstream; + } + + if (intermediateShortCircuit || terminal.isShortCircuit() || iterator != null) { + // Pull traversal + Iterator<U> chain = iterator(); + StatefulSink<U, V> sink = terminal.sink(); + sink.begin(-1); // @@@ supply size if known (iterator was null, chain is size-preserving, initial size known) + while (chain.hasNext()) + sink.accept(chain.next()); + return sink.end(); + } + else { + // Push traversal + int nStateful = 0; + StatefulSink[] statefulSinks = new StatefulSink[ops.length]; + StatefulSink<U, V> terminalSink = terminal.sink(); + Sink sink = terminalSink; + terminalSink.begin(-1); // @@@ supply size if known to terminal stage + for (int i=ops.length-1; i >= 0; i--) { + sink = ops[i].sink(sink); + if (ops[i].isStateful()) { + StatefulSink s = (StatefulSink) sink; + statefulSinks[nStateful++] = s; + s.begin(-1); // @@@ supply size if known to *this* stage + } + } + source.forEach(sink); + for (int i = nStateful-1; i >= 0; i--) + statefulSinks[i].end(); + return terminalSink.end(); + } + } + + @SuppressWarnings({ "unchecked", "raw" }) + protected static<U,T> ParallelOp.ParallelOpHelper<U, T> makeParallelHelper(final StreamAccessor<T> source, final IntermediateOp[] ops) { + return new ParallelOp.ParallelOpHelper<U, T>() { + @Override + public int suggestDepth() { + int size = source.getSizeOrEstimate(); + if (size < 0) + size = -size; + return ForkJoinUtils.suggestDepth(size); + } + + @Override + public Spliterator<T> spliterator() { + return source.spliterator(); + } + + @Override + public Sink<T> sink(Sink<? super U> sink) { + Sink<?> chain = sink; + for (int i = ops.length-1; i >= 0; i--) { + IntermediateOp op = ops[i]; + chain = op.sink(chain); + } + + return (Sink<T>) chain; + } + + @Override + public Iterator iterator() { + Iterator it = source.iterator(); + for (IntermediateOp op : ops) + it = op.iterator(it); + return it; + } + + @Override + public <Z> Z invoke(ForkJoinTask<Z> task) { + return ForkJoinUtils.defaultFJPool().invoke(task); + } + }; + } + + public Iterator<U> iterator() { + if (iterator == null) { + iterator = (Iterator) ((op == null) ? source : op.iterator(upstream.iterator())); + } + return iterator; + } + + private Iterator<U> iterator(StreamAccessor<?> source, IntermediateOp[] stages) { + Iterator it = source.iterator(); + for (IntermediateOp op : stages) + it = op.iterator(it); + return (Iterator<U>) it; + } + + public boolean isParallel() { + return source.isParallel(); + } + + public Stream.Shape getInputShape() { + return source.getShape(); + } + + public Stream.Shape getOutputShape() { + return op == null ? source.getShape() : op.outputShape(); + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/share/classes/java/util/streams/AbstractSequentialStreamAccessor.java Sat Aug 25 19:03:55 2012 -0400 @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ +package java.util.streams; + +import java.util.Iterator; +import java.util.Spliterator; +import java.util.functions.Sink; + +/** + * AbstractSequentialStreamAccessor + * + * @author Brian Goetz + */ +public abstract class AbstractSequentialStreamAccessor<T> implements StreamAccessor<T> { + @Override + public boolean isParallel() { + return false; + } + + @Override + public boolean isPredictableSplits() { + return false; + } + + @Override + public Stream.Shape getShape() { + return Stream.Shape.LINEAR; + } + + @Override + public Spliterator<T> spliterator() { + throw new UnsupportedOperationException(); + } + + @Override + public int getStreamFlags() { + return 0; + } + + @Override + public int getSizeOrEstimate() { + return -Integer.MAX_VALUE; + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/share/classes/java/util/streams/LinearPipeline.java Sat Aug 25 19:03:55 2012 -0400 @@ -0,0 +1,183 @@ +/* + * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ +package java.util.streams; + +import java.util.Comparator; +import java.util.Fillable; +import java.util.Map; +import java.util.Optional; +import java.util.functions.*; +import java.util.streams.ops.*; + +/** + * LinearPipeline + * + * @param <T> Type of elements in the upstream source. + * @param <U> Type of elements in produced by this stage. + * + * @author Brian Goetz + */ +public class LinearPipeline<T, U> extends AbstractPipeline<T,U> implements Stream<U> { + + public<S> LinearPipeline(StreamAccessor<S> source) { + super(source); + assert source.getShape() == Shape.LINEAR; + } + + public LinearPipeline(AbstractPipeline<?, T> upstream, IntermediateOp<T, U> op) { + super(upstream, op); + } + + <V> LinearPipeline<U, V> chainWith(IntermediateOp<U, V> op) { + return new LinearPipeline<>(this, op); + } + + <V> LinearPipeline<T, V> replaceWith(IntermediateOp<T, V> op) { + return new LinearPipeline<>(upstream, op); + } + + public<V> Stream<V> pipeline(IntermediateOp<U, V> op) { + // @@@ delegate to shape to do instantiation + return chainWith(op); + } + + public<V> V pipeline(TerminalOp<U, V> op) { + return evaluate(op); + } + + @Override + public Stream<U> filter(Predicate<? super U> predicate) { + return pipeline(new FilterOp<>(predicate)); + } + + @Override + public <R> Stream<R> map(Mapper<? super U, ? extends R> mapper) { + return pipeline(new MapOp<>(mapper)); + } + + @Override + public <R> Stream<R> flatMap(FlatMapper<? super U, R> mapper) { + return pipeline(new FlatMapOp<>(mapper)); + } + + @Override + public Stream<U> uniqueElements() { + return pipeline(UniqOp.<U>singleton()); + } + + @Override + public Stream<U> sorted(Comparator<? super U> comparator) { + return pipeline(new SortedOp<>(comparator)); + } + + @Override + public Stream<U> cumulate(BinaryOperator<U> operator) { + return pipeline(new CumulateOp<>(operator)); + } + + @Override + public void forEach(Sink<? super U> sink) { + pipeline(new ForEachOp<>(sink)); + } + + @Override + public <A extends Fillable<? super U>> A into(A target) { + forEach(target::add); + return target; + } + + @Override + public <K> Map<K, Streamable<U>> groupBy(Mapper<? super U, ? extends K> mapper) { + return pipeline(new GroupByOp<>(mapper)); + } + + @Override + public Object[] toArray() { + return pipeline(ToArrayOp.<U>singleton()); + } + + @Override + public boolean anyMatch(Predicate<? super U> predicate) { + return pipeline(new AnyMatchOp<>(predicate)); + } + + @Override + public boolean allMatch(Predicate<? super U> predicate) { + return pipeline(new AllMatchOp<>(predicate)); + } + + @Override + public boolean noneMatch(Predicate<? super U> predicate) { + return pipeline(new NoneMatchOp<>(predicate)); + } + + @Override + public Optional<U> findFirst() { + return pipeline(FindFirstOp.<U>singleton()); + } + + @Override + public Optional<U> findAny() { + return pipeline(FindAnyOp.<U>singleton()); + } + + @Override + public Stream<U> sequential() { + if (!isParallel()) { + return this; + } + else { + TreeUtils.Node<U> collected = evaluate(TreeUtils.CollectorOp.<U>singleton()); + return Streams.stream(collected, collected.size()); + } + } + + @Override + public <R> MapStream<U, R> mapped(Mapper<? super U, ? extends R> mapper) { + return new MapPipeline<>(this, new MappedOp<>(mapper)); + } + + @Override + public U reduce(final U seed, final BinaryOperator<U> op) { + // @@@ Temporary hack to work around compiler error + Factory<U> seedFactory = new Factory<U>() { + @Override + public U make() { + return seed; + } + }; + return pipeline(new FoldOp<>(seedFactory, op, op)); + } + + @Override + public Optional<U> reduce(BinaryOperator<U> op) { + return pipeline(new SeedlessFoldOp<>(op)); + } + + @Override + public <V> V fold(Factory<V> seedFactory, Combiner<V, U, V> reducer, BinaryOperator<V> combiner) { + return pipeline(new FoldOp<>(seedFactory, reducer, combiner)); + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/share/classes/java/util/streams/MapPipeline.java Sat Aug 25 19:03:55 2012 -0400 @@ -0,0 +1,169 @@ +/* + * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ +package java.util.streams; + +import java.util.*; +import java.util.functions.*; +import java.util.streams.ops.*; + +/** + * MapPipeline + * + * @param <U> Type of elements in the upstream. + * @param <K> Type of mapping key. + * @param <V> Type of mapping value. + * + * @author Brian Goetz + */ +public class MapPipeline<U, K, V> + extends AbstractPipeline<U, Mapping<K,V>> + implements MapStream<K, V> { + /** + * If initialized then we are committed to pull iteration via iterator. + */ + private MapIterator<K,V> iterator; + + public MapPipeline(MapStreamAccessor<K, V> source) { + super(source); + assert source.getShape() == Stream.Shape.LINEAR; + } + + public MapPipeline(AbstractPipeline<?, U> upstream, IntermediateOp<U, Mapping<K, V>> op) { + super(upstream, op); + } + + <KK, VV> MapPipeline<Mapping<K, V>, KK, VV> chainWith(IntermediateOp<Mapping<K, V>, Mapping<KK, VV>> op) { + return new MapPipeline<>(this, op); + } + + <KK, VV> MapPipeline<U, KK, VV> replaceWith(IntermediateOp<U, Mapping<KK, VV>> op) { + return new MapPipeline<>(upstream, op); + } + + @Override + public MapIterator<K,V> iterator() { + if (iterator == null) { + Iterator<Mapping<K, V>> wrapped = (op == null) + ? ((MapStreamAccessor<K, V>) source) + : op.iterator(upstream.iterator()); + iterator = (wrapped instanceof MapIterator) + ? (MapIterator<K, V>) wrapped + : new MapIterator.IteratorAdapter<>(wrapped); + } + return iterator; + } + + @Override + public void forEach(BiSink<K, V> sink) { + pipeline(new ForEachOp<>(sink)); + } + + public<U> U pipeline(TerminalOp<Mapping<K,V>, U> op) { + return evaluate(op); + } + + @Override + public <A extends Map<K, V>> A into(A target) { + forEach(target::put); + return target; + } + + @Override + public Stream<K> keys() { + return new LinearPipeline<>(this, new MapExtractKeysOp<K,V>()); + } + + @Override + public Stream<V> values() { + return new LinearPipeline<>(this, new MapExtractValuesOp<K,V>()); + } + + @Override + public MapStream<K, V> filter(BiPredicate<? super K, ? super V> predicate) { + return chainWith(new BiFilterOp<>(predicate)); + } + + @Override + public MapStream<K, V> filterKeys(Predicate<? super K> predicate) { + return chainWith(new MapFilterKeysOp<K,V>(predicate)); + } + + @Override + public MapStream<K, V> filterValues(Predicate<? super V> predicate) { + return chainWith(new MapFilterValuesOp<K,V>(predicate)); + } + + @Override + public <U> MapStream<K, U> mapValues(BiMapper<? super K, ? super V, ? extends U> mapper) { + return chainWith(new BiMapOp<>(mapper)); + } + + @Override + public <U> MapStream<K, U> mapValues(Mapper<? super V, ? extends U> mapper) { + return chainWith(new MapMapValuesOp<K,V,U>(mapper)); + } + + @Override + public MapStream<V, K> swap() { + return chainWith(MapSwapOp.<K,V>singleton()); + } + + @Override + public MapStream<K, V> sorted(Comparator<? super K> comparator) { +// return chainWith(new MapSortedOp<K,V>(comparator)); + return null; + } + + @Override + public MapStream<K, V> mergeWith(MapStream<K, ? extends V> other) { + // !?! mduigou I don't know what semantic was intended. + throw new UnsupportedOperationException("nyi"); + } + + @Override + public boolean noneMatch(BiPredicate<? super K, ? super V> predicate) { + return allMatch(predicate.negate()); + } + + @Override + public boolean allMatch(BiPredicate<? super K, ? super V> predicate) { + return pipeline(new BiAllMatchOp<>(predicate)); + } + + @Override + public boolean anyMatch(BiPredicate<? super K, ? super V> predicate) { + return pipeline(new BiAnyMatchOp<>(predicate)); + } + + @Override + public Optional<Mapping<K, V>> findAny() { + return pipeline(FindAnyOp.<Mapping<K,V>>singleton()); + } + + @Override + public Optional<Mapping<K, V>> findFirst() { + return pipeline(FindFirstOp.<Mapping<K,V>>singleton()); + } +}
--- a/src/share/classes/java/util/streams/MapStream.java Mon Aug 20 16:10:37 2012 -0400 +++ b/src/share/classes/java/util/streams/MapStream.java Sat Aug 25 19:03:55 2012 -0400 @@ -24,7 +24,8 @@ */ package java.util.streams; -import java.util.MapIterator; +import java.util.*; +import java.util.functions.*; /** * A stream who's elements are key-value mappings. @@ -34,5 +35,153 @@ * * @author Brian Goetz */ -public interface MapStream<K, V> extends MapStreamOps<K, V>, MapIterator<K, V> { +public interface MapStream<K, V> { + MapIterator<K, V> iterator(); + + + /** + * Return the mapping which is semantically "first" in the stream. If the + * stream is not ordered then repeated invocations may return a different + * mapping. + * + * @return the first mapping of the stream if any. + */ + Optional<Mapping<K, V>> findFirst(); + + /** + * Return any mapping in the stream. Repeated invocations may return the + * same mapping or a different mapping. + * + * @return a mapping in the stream. + */ + Optional<Mapping<K, V>> findAny(); + + /** + * Return a stream of the keys. + * + * @return a stream of the keys. + */ + Stream<K> keys(); + + /** + * Return a stream of the values. + * + * @return a stream of the values. + */ + Stream<V> values(); + + /** + * Return a stream filtered by the provided predicate. + * + * @param predicate the predicate used to filter elements. + * @return The filtered stream. + */ + MapStream<K, V> filter(BiPredicate<? super K, ? super V> predicate); + + /** + * Return a stream filtered by the provided predicate. + * + * @param predicate the predicate used to filter elements based upon keys. + * @return The filtered stream. + */ + MapStream<K, V> filterKeys(Predicate<? super K> predicate); + + /** + * Return a stream filtered by the provided predicate. + * + * @param predicate the predicate used to filter elements based upon values. + * @return The filtered stream. + */ + MapStream<K, V> filterValues(Predicate<? super V> predicate); + + /** + * Return a MapStream where the values have been mapped via the provided + * Mapper. + * + * @param <U> Type of mapping values in the returned stream. + * @param mapper the predicate used to map elements. + * @return The mapped stream. + */ + <U> MapStream<K, U> mapValues(BiMapper<? super K, ? super V, ? extends U> mapper); + + /** + * Return a MapStream where the values have been mapped via the provided + * Mapper. + * + * @param <U> Type of mapping values in the returned stream. + * @param mapper the predicate used to map elements. + * @return The mapped stream. + */ + <U> MapStream<K, U> mapValues(Mapper<? super V, ? extends U> mapper); + + /** + * Each element of this stream is processed by the provided sink. + * + * @param sink the Sink via which all elements will be processed. + */ + void forEach(BiSink<K, V> sink); + + // @@@ Map, or MapFillable? + <A extends Map<K, V>> A into(A target); + + /** + * Any of the elements of this map stream return {@code true} for the + * provided predicate. + * + * @param predicate The predicate against which elements will be evaluated. + * @return {@code true} if any of the elements return {@code true} for the + * provided predicate. + */ + boolean anyMatch(BiPredicate<? super K, ? super V> predicate); + + /** + * all of the elements of this map stream return {@code true} for the + * provided predicate. + * + * @param predicate The predicate against which elements will be evaluated. + * @return {@code true} if all of the elements return {@code true} for the + * provided predicate. + */ + boolean allMatch(BiPredicate<? super K, ? super V> predicate); + + /** + * None of the elements of this map stream return {@code true} for the + * provided predicate. + * + * @param predicate The predicate against which elements will be evaluated. + * @return {@code true} if none of the elements return {@code true} for the + * provided predicate. + */ + boolean noneMatch(BiPredicate<? super K, ? super V> predicate); + + /** + * Returns a stream composed of the elements o + * + * ?!? mduigou Intended semantic for handling duplicates? Is concatenation a + * valid implementation? + * + * @param other + * @return + */ + MapStream<K, V> mergeWith(MapStream<K, ? extends V> other); + + /** + * Returns a map stream containing the same elements sorted in the order + * specified by the provided Comparator. + * + * @param comparator used to evaluate elements to determine their sorted + * order. + * @return a map stream containing the same elements sorted in the order + * specified by the provided Comparator. + */ + MapStream<K, V> sorted(Comparator<? super K> comparator); + + /** + * Return a map stream which contains the same elements as this stream but + * the keys and values have been swapped. + * + * @return A map stream which contains the same elements as this stream but + * the keys and values have been swapped. + */ + MapStream<V, K> swap(); }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/share/classes/java/util/streams/MapStreamAccessor.java Sat Aug 25 19:03:55 2012 -0400 @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ +package java.util.streams; + +import java.util.MapIterator; +import java.util.Mapping; +import java.util.functions.BiSink; + +/** + * MapStreamAccessor + * + * @author Brian Goetz + */ +public interface MapStreamAccessor<K, V> extends StreamAccessor<Mapping<K,V>>, MapIterator<K,V> { + void forEach(BiSink<? super K, ? super V> sink); + + Stream.Shape getShape() default { return Stream.Shape.KEY_VALUE; } +}
--- a/src/share/classes/java/util/streams/MapStreamOps.java Mon Aug 20 16:10:37 2012 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,188 +0,0 @@ -/* - * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. Oracle designates this - * particular file as subject to the "Classpath" exception as provided - * by Oracle in the LICENSE file that accompanied this code. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA - * or visit www.oracle.com if you need additional information or have any - * questions. - */ -package java.util.streams; - -import java.util.Comparator; -import java.util.Map; -import java.util.Mapping; -import java.util.Optional; -import java.util.functions.*; - -/** - * Operations upon MapStreams. - * - * @param <K> Type of mapping key. - * @param <V> Type of mapping value. - * - * @author Brian Goetz - */ -interface MapStreamOps<K, V> { - - /** - * Return the mapping which is semantically "first" in the stream. If the - * stream is not ordered then repeated invocations may return a different - * mapping. - * - * @return the first mapping of the stream if any. - */ - Optional<Mapping<K, V>> findFirst(); - - /** - * Return any mapping in the stream. Repeated invocations may return the - * same mapping or a different mapping. - * - * @return a mapping in the stream. - */ - Optional<Mapping<K, V>> findAny(); - - /** - * Return a stream of the keys. - * - * @return a stream of the keys. - */ - Stream<K> keys(); - - /** - * Return a stream of the values. - * - * @return a stream of the values. - */ - Stream<V> values(); - - /** - * Return a stream filtered by the provided predicate. - * - * @param predicate the predicate used to filter elements. - * @return The filtered stream. - */ - MapStream<K, V> filter(BiPredicate<? super K, ? super V> predicate); - - /** - * Return a stream filtered by the provided predicate. - * - * @param predicate the predicate used to filter elements based upon keys. - * @return The filtered stream. - */ - MapStream<K, V> filterKeys(Predicate<? super K> predicate); - - /** - * Return a stream filtered by the provided predicate. - * - * @param predicate the predicate used to filter elements based upon values. - * @return The filtered stream. - */ - MapStream<K, V> filterValues(Predicate<? super V> predicate); - - /** - * Return a MapStream where the values have been mapped via the provided - * Mapper. - * - * @param <U> Type of mapping values in the returned stream. - * @param mapper the predicate used to map elements. - * @return The mapped stream. - */ - <U> MapStream<K, U> mapValues(BiMapper<? super K, ? super V, ? extends U> mapper); - - /** - * Return a MapStream where the values have been mapped via the provided - * Mapper. - * - * @param <U> Type of mapping values in the returned stream. - * @param mapper the predicate used to map elements. - * @return The mapped stream. - */ - <U> MapStream<K, U> mapValues(Mapper<? super V, ? extends U> mapper); - - /** - * Each element of this stream is processed by the provided sink. - * - * @param sink the Sink via which all elements will be processed. - */ - void forEach(BiSink<K, V> sink); - - // @@@ Map, or MapFillable? - <A extends Map<K, V>> A into(A target); - - /** - * Any of the elements of this map stream return {@code true} for the - * provided predicate. - * - * @param predicate The predicate against which elements will be evaluated. - * @return {@code true} if any of the elements return {@code true} for the - * provided predicate. - */ - boolean anyMatch(BiPredicate<? super K, ? super V> predicate); - - /** - * all of the elements of this map stream return {@code true} for the - * provided predicate. - * - * @param predicate The predicate against which elements will be evaluated. - * @return {@code true} if all of the elements return {@code true} for the - * provided predicate. - */ - boolean allMatch(BiPredicate<? super K, ? super V> predicate); - - /** - * None of the elements of this map stream return {@code true} for the - * provided predicate. - * - * @param predicate The predicate against which elements will be evaluated. - * @return {@code true} if none of the elements return {@code true} for the - * provided predicate. - */ - boolean noneMatch(BiPredicate<? super K, ? super V> predicate); - - /** - * Returns a stream composed of the elements o - * - * ?!? mduigou Intended semantic for handling duplicates? Is concatenation a - * valid implementation? - * - * @param other - * @return - */ - MapStream<K, V> mergeWith(MapStream<K, ? extends V> other); - - /** - * Returns a map stream containing the same elements sorted in the order - * specified by the provided Comparator. - * - * @param comparator used to evaluate elements to determine their sorted - * order. - * @return a map stream containing the same elements sorted in the order - * specified by the provided Comparator. - */ - MapStream<K, V> sorted(Comparator<? super K> comparator); - - /** - * Return a map stream which contains the same elements as this stream but - * the keys and values have been swapped. - * - * @return A map stream which contains the same elements as this stream but - * the keys and values have been swapped. - */ - MapStream<V, K> swap(); -}
--- a/src/share/classes/java/util/streams/MapStreamable.java Mon Aug 20 16:10:37 2012 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,36 +0,0 @@ -/* - * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. Oracle designates this - * particular file as subject to the "Classpath" exception as provided - * by Oracle in the LICENSE file that accompanied this code. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA - * or visit www.oracle.com if you need additional information or have any - * questions. - */ -package java.util.streams; - -/** - * MapStreamable - * - * @param <K> Type of mapping key. - * @param <V> Type of mapping value. - * - * @author Brian Goetz - */ -public interface MapStreamable<K, V> extends MapStreamOps<K,V> { -}
--- a/src/share/classes/java/util/streams/ParallelPipeline.java Mon Aug 20 16:10:37 2012 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,360 +0,0 @@ -/* - * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. Oracle designates this - * particular file as subject to the "Classpath" exception as provided - * by Oracle in the LICENSE file that accompanied this code. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA - * or visit www.oracle.com if you need additional information or have any - * questions. - */ -package java.util.streams; - -import java.util.*; -import java.util.concurrent.ForkJoinTask; -import java.util.concurrent.ForkJoinUtils; -import java.util.concurrent.RecursiveAction; -import java.util.concurrent.RecursiveTask; -import java.util.functions.*; -import java.util.streams.ops.*; - -/** - * ParallelPipeline - * - * @author Brian Goetz - */ -public class ParallelPipeline<T, U> implements ParallelStream<U> { - private final ParallelStreamable<T> base; - private final ParallelPipeline<?, T> upstream; - private final StatelessOp<T, U> stage; - - public ParallelPipeline(ParallelStreamable<T> base, StatelessOp<T, U> onlyStage) { - this.base = base; - this.upstream = null; - this.stage = onlyStage; - assert !stage.isStateful(); - } - - ParallelPipeline(ParallelPipeline<?, T> upstream, StatelessOp<T, U> newStage) { - this.base = null; - this.upstream = upstream; - this.stage = newStage; - assert !stage.isStateful(); - } - - public static<T> ParallelPipeline<T,T> wrap(ParallelStreamable<T> source) { - return new ParallelPipeline<>(source, new IdOp<T>()); - } - - private <V> ParallelPipeline<T, V> replaceWith(StatelessOp<T, V> newStage) { - return base != null - ? new ParallelPipeline<>(base, newStage) - : new ParallelPipeline<>(upstream, newStage); - } - - private <V> ParallelPipeline<U, V> chainWith(StatelessOp<U, V> newStage) { - return new ParallelPipeline<>(this, newStage); - } - - private Iterator<U> iterator() { - return stage.iterator(base == null ? upstream.iterator() : base.iterator()); - } - - private int getStreamState() { - // @@@ Need to add stream-state support to ParallelStreamable - return stage.getStreamState(upstream != null ? upstream.getStreamState() : Stream.STATE_SIZED); - } - - @Override - public ParallelStream<U> filter(Predicate<? super U> predicate) { - return chainWith(new FilterOp<>(predicate)); - } - - @Override - public <R> ParallelStream<R> map(Mapper<? super U, ? extends R> mapper) { - return chainWith(new MapOp<>(mapper)); - } - - @Override - public <R> ParallelStream<R> flatMap(FlatMapper<? super U, R> mapper) { - return chainWith(new FlatMapOp<>(mapper)); - } - - public<V> V pipeline(EagerOp<U, V> op) { - return op.computeParallel(base, makeHelper()); - } - - public<V> V pipeline(ShortCircuitEagerOp<U, V> op) { - return op.computeParallel(base, makeHelper()); - } - - public<V> ParallelStreamable<V> pipeline(StatefulOp<U, V, ?> op) { - return op.computeParallel(base, makeHelper()); - } - - @Override - public ParallelStreamable<U> uniqueElements() { - return pipeline(new UniqOp<U>()); - } - - @Override - public ParallelStreamable<U> sorted(Comparator<? super U> comparator) { - return pipeline(new SortedOp<>(comparator)); - } - - @Override - public ParallelStreamable<U> cumulate(BinaryOperator<U> operator) { - return pipeline(new CumulateOp<>(operator)); - } - - @Override - public void forEach(final Sink<? super U> sink) { - int depth = ForkJoinUtils.suggestDepth(base.size()); - Sink<T> compoundSink = makeSink(sink); - Spliterator<T> spliterator = base.spliterator(); - if (depth == 0) { - spliterator.forEach(compoundSink); - } else { - ForkJoinUtils.defaultFJPool().invoke(new ForEachTask<>(depth, spliterator, compoundSink)); - } - } - - @Override - public <A extends Fillable<? super U>> A into(A target) { - forEach((U u) -> { target.add(u); }); - return target; - } - - @Override - public Object[] toArray() { - return pipeline(new ToArrayOp<U>()); - } - - @Override - public <K> Map<K, SizedStreamable<U>> groupBy(Mapper<? super U, ? extends K> mapper) { - return pipeline(new GroupByOp<>(mapper)); - } - - @Override - public SizedStreamable<U> sequential() { - // @@@ TODO: build more generalized machinery for building trees of splits, pre-sizing arrays, pre-packing, etc - ParallelOp.ParallelOpHelper<U, T> helper = makeHelper(); - int depth = helper.suggestDepth(base); - Spliterator<T> spliterator = base.spliterator(); - if (depth == 0) { - int size = spliterator.getRemainingSizeIfKnown(); - StreamBuilder<U> builder = (size != -1 && (getStreamState() & Stream.STATE_SIZED) != 0) - ? StreamBuilders.<U>makeFixed(size) - : StreamBuilders.<U>make(); - spliterator.forEach(helper.sink(builder)); - return builder; - } - else { - if (spliterator.isPredictableSplits() && (getStreamState() & Stream.STATE_SIZED) != 0) { - int length = spliterator.getRemainingSizeIfKnown(); - U[] array = (U[]) new Object[length]; - helper.invoke(new SizedSequentialTask<>(depth, spliterator, helper, array, 0, length)); - return Arrays.asList(array); - } - else { - return helper.invoke(new SequentialTask<>(depth, spliterator, helper)); - } - } - } - - @Override - public U reduce(final U seed, final BinaryOperator<U> op) { - // @@@ Temporary hack to work around compiler error - Factory<U> seedFactory = new Factory<U>() { - @Override - public U make() { - return seed; - } - }; - return pipeline(new FoldOp<>(seedFactory, op, op)); - } - - @Override - public Optional<U> reduce(BinaryOperator<U> op) { - return pipeline(new SeedlessFoldOp<>(op)); - } - - @Override - public <V> V fold(Factory<V> seedFactory, Combiner<V, U, V> reducer, BinaryOperator<V> combiner) { - return pipeline(new FoldOp<>(seedFactory, reducer, combiner)); - } - - @Override - public boolean anyMatch(Predicate<? super U> predicate) { - return pipeline(new AnyMatchOp<>(predicate)); - } - - @Override - public boolean allMatch(Predicate<? super U> predicate) { - return pipeline(new AllMatchOp<>(predicate)); - } - - @Override - public boolean noneMatch(Predicate<? super U> predicate) { - return pipeline(new NoneMatchOp<>(predicate)); - } - - @Override - public Optional<U> findFirst() { - return pipeline(new FindFirstOp<U>()); - } - - @Override - public Optional<U> findAny() { - return pipeline(new FindAnyOp<U>()); - } - - private<V> Sink<T> makeSink(Sink<V> sink) { - ParallelPipeline<?,?> pipe = this; - Sink<?> chain = sink; - - do { - StatelessOp<?,?> thisStage = pipe.stage; - chain = thisStage.sink( (Sink) chain); - pipe = pipe.upstream; - } while (pipe != null); - - return (Sink<T>) chain; - } - - private<V> Mapper<Sink<V>, Sink<T>> makeSinkFactory() { - return new Mapper<Sink<V>, Sink<T>>() { - @Override - public Sink<T> map(final Sink<V> sink) { - return makeSink(sink); - } - }; - } - - private ParallelOp.ParallelOpHelper<U, T> makeHelper() { - return new ParallelOp.ParallelOpHelper<U, T>() { - @Override - public int suggestDepth(ParallelStreamable<T> source) { - return ForkJoinUtils.suggestDepth(source.size()); - } - - @Override - public Sink<T> sink(Sink<U> sink) { - return makeSink(sink); - } - - @Override - public Iterator<U> iterator() { - return ParallelPipeline.this.iterator(); - } - - @Override - public <Z> Z invoke(ForkJoinTask<Z> task) { - return ForkJoinUtils.defaultFJPool().invoke(task); - } - }; - } - - private static class ForEachTask<T> extends RecursiveAction { - private final int depth; - private final Spliterator<T> spliterator; - private final Sink<T> sink; - - private ForEachTask(int depth, Spliterator<T> spliterator, Sink<T> sink) { - this.depth = depth; - this.spliterator = spliterator; - this.sink = sink; - } - - @Override - protected void compute() { - if (depth == 0) { - spliterator.forEach(sink); - } else { - ForEachTask<T> left = new ForEachTask<>(depth - 1, spliterator.split(), sink); - ForEachTask<T> right = new ForEachTask<>(depth - 1, spliterator, sink); - right.fork(); - left.compute(); - right.join(); - } - } - } - - private static class SizedSequentialTask<T, U> extends RecursiveAction { - private final int depth; - private final Spliterator<T> spliterator; - private ParallelOp.ParallelOpHelper<U, T> helper; - private final U[] array; - private int offset; - private int length; - - private SizedSequentialTask(int depth, Spliterator<T> spliterator, ParallelOp.ParallelOpHelper<U, T> helper, U[] array, int offset, int length) { - this.depth = depth; - this.spliterator = spliterator; - this.helper = helper; - this.array = array; - this.offset = offset; - this.length = length; - } - - @Override - protected void compute() { - if (depth == 0) { - spliterator.forEach(helper.sink(Arrays.sink(array, offset, length))); - } - else { - Spliterator<T> split = spliterator.split(); - int leftSize = split.getRemainingSizeIfKnown(); - assert(leftSize != -1); - SizedSequentialTask<T, U> left = new SizedSequentialTask<>(depth-1, split, helper, array, offset, leftSize); - SizedSequentialTask<T, U> right = new SizedSequentialTask<>(depth-1, spliterator, helper, array, offset+leftSize, length-leftSize); - right.fork(); - left.compute(); - right.join(); - } - } - } - - private static class SequentialTask<T, U> extends RecursiveTask<SizedStreamable<U>> { - private final int depth; - private final Spliterator<T> spliterator; - private ParallelOp.ParallelOpHelper<U, T> helper; - - public SequentialTask(int depth, Spliterator<T> spliterator, ParallelOp.ParallelOpHelper<U, T> helper) { - this.depth = depth; - this.spliterator = spliterator; - this.helper = helper; - } - - @Override - protected SizedStreamable<U> compute() { - if (depth == 0) { - StreamBuilder<U> builder = StreamBuilders.make(); - spliterator.forEach(helper.sink(builder)); - return builder; - } - else { - SequentialTask<T, U> left = new SequentialTask<>(depth-1, spliterator.split(), helper); - SequentialTask<T, U> right = new SequentialTask<>(depth-1, spliterator, helper); - right.fork(); - SizedStreamable<U> leftResult = left.compute(); - SizedStreamable<U> rightResult = right.join(); - return Streams.concatenate(leftResult, rightResult); - } - } - } -}
--- a/src/share/classes/java/util/streams/ParallelStream.java Mon Aug 20 16:10:37 2012 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,34 +0,0 @@ -/* - * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. Oracle designates this - * particular file as subject to the "Classpath" exception as provided - * by Oracle in the LICENSE file that accompanied this code. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA - * or visit www.oracle.com if you need additional information or have any - * questions. - */ -package java.util.streams; - -/** - * ParallelStream - * - * @param <T> Type of elements. - * - * @author Brian Goetz - */ -public interface ParallelStream<T> extends ParallelStreamOps<T> { }
--- a/src/share/classes/java/util/streams/ParallelStreamOps.java Mon Aug 20 16:10:37 2012 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,90 +0,0 @@ -/* - * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. Oracle designates this - * particular file as subject to the "Classpath" exception as provided - * by Oracle in the LICENSE file that accompanied this code. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA - * or visit www.oracle.com if you need additional information or have any - * questions. - */ -package java.util.streams; - -import java.util.Comparator; -import java.util.Fillable; -import java.util.Map; -import java.util.Optional; -import java.util.functions.*; - -/** - * ParallelStreamOps - * - * @param <T> Type of elements. - * - * @author Brian Goetz - */ -interface ParallelStreamOps<T> { - ParallelStream<T> filter(Predicate<? super T> predicate); - - <R> ParallelStream<R> map(Mapper<? super T, ? extends R> mapper); - - <R> ParallelStream<R> flatMap(FlatMapper<? super T, R> mapper); - - void forEach(Sink<? super T> sink); - - <A extends Fillable<? super T>> A into(A target); - - Object[] toArray(); - - <K> Map<K, SizedStreamable<T>> groupBy(Mapper<? super T, ? extends K> mapper); - - /** - * Produce an {@link Iterable} representing the contents of this {@code Splittable}. In general, this method is - * only called at the top of a decomposition tree, indicating that operations that produced the {@code Spliterable} - * can happen in parallel, but the results are assembled for sequential traversal. This is designed to support - * patterns like: - * collection.filter(t -> t.matches(k)) - * .map(t -> t.getLabel()) - * .sequential() - * .forEach(e -> println(e)); - * where the filter / map operations can occur in parallel, and then the results can be traversed - * sequentially in a predicatable order. - */ - SizedStreamable<T> sequential(); - - ParallelStreamable<T> uniqueElements(); - - ParallelStreamable<T> sorted(Comparator<? super T> comparator); - - ParallelStreamable<T> cumulate(BinaryOperator<T> operator); - - T reduce(T base, BinaryOperator<T> op); - - Optional<T> reduce(BinaryOperator<T> op); - - <U> U fold(Factory<U> baseFactory, Combiner<U, T, U> reducer, BinaryOperator<U> combiner); - - boolean anyMatch(Predicate<? super T> predicate); - - boolean allMatch(Predicate<? super T> predicate); - - boolean noneMatch(Predicate<? super T> predicate); - - Optional<T> findFirst(); - - Optional<T> findAny(); -}
--- a/src/share/classes/java/util/streams/ParallelStreamable.java Mon Aug 20 16:10:37 2012 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,135 +0,0 @@ -/* - * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. Oracle designates this - * particular file as subject to the "Classpath" exception as provided - * by Oracle in the LICENSE file that accompanied this code. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA - * or visit www.oracle.com if you need additional information or have any - * questions. - */ -package java.util.streams; - -import java.util.*; -import java.util.functions.*; -import java.util.streams.ops.FilterOp; -import java.util.streams.ops.FlatMapOp; -import java.util.streams.ops.MapOp; - -/** - * ParallelStreamable - * - * @param <T> Type of elements. - * - * @author Brian Goetz - */ -public interface ParallelStreamable<T> extends ParallelStreamOps<T>, Splittable<T>, Sized { - @Override - ParallelStream<T> filter(Predicate<? super T> predicate) default { - return new ParallelPipeline<>(this, new FilterOp<>(predicate)); - } - - @Override - <R> ParallelStream<R> map(Mapper<? super T, ? extends R> mapper) default { - return new ParallelPipeline<>(this, new MapOp<>(mapper)); - } - - @Override - <R> ParallelStream<R> flatMap(FlatMapper<? super T, R> mapper) default { - return new ParallelPipeline<>(this, new FlatMapOp<>(mapper)); - } - - @Override - ParallelStreamable<T> uniqueElements() default { - return ParallelPipeline.wrap(this).uniqueElements(); - } - - @Override - ParallelStreamable<T> sorted(Comparator<? super T> comparator) default { - return ParallelPipeline.wrap(this).sorted(comparator); - } - - @Override - ParallelStreamable<T> cumulate(BinaryOperator<T> operator) default { - return ParallelPipeline.wrap(this).cumulate(operator); - } - - @Override - public void forEach(Sink<? super T> sink) default { - ParallelPipeline.wrap(this).forEach(sink); - } - - @Override - <A extends Fillable<? super T>> A into(A target) default { - return ParallelPipeline.wrap(this).into(target); - } - - @Override - Object[] toArray() default { - return ParallelPipeline.wrap(this).toArray(); - } - - @Override - <K> Map<K, SizedStreamable<T>> groupBy(Mapper<? super T, ? extends K> mapper) default { - return ParallelPipeline.wrap(this).groupBy(mapper); - } - - @Override - public T reduce(T base, BinaryOperator<T> op) default { - return ParallelPipeline.wrap(this).reduce(base, op); - } - - @Override - Optional<T> reduce(BinaryOperator<T> op) default { - return ParallelPipeline.wrap(this).reduce(op); - } - - @Override - public <U> U fold(Factory<U> seedFactory, Combiner<U, T, U> reducer, BinaryOperator<U> combiner) default { - return ParallelPipeline.wrap(this).fold(seedFactory, reducer, combiner); - } - - @Override - public SizedStreamable<T> sequential() default { - return ParallelPipeline.wrap(this).sequential(); - } - - @Override - boolean anyMatch(Predicate<? super T> predicate) default { - return ParallelPipeline.wrap(this).anyMatch(predicate); - } - - @Override - boolean allMatch(Predicate<? super T> predicate) default { - return ParallelPipeline.wrap(this).allMatch(predicate); - } - - @Override - boolean noneMatch(Predicate<? super T> predicate) default { - return ParallelPipeline.wrap(this).noneMatch(predicate); - } - - @Override - Optional<T> findAny() default { - return ParallelPipeline.wrap(this).findAny(); - } - - @Override - Optional<T> findFirst() default { - return ParallelPipeline.wrap(this).findFirst(); - } -}
--- a/src/share/classes/java/util/streams/SequentialMapPipeline.java Mon Aug 20 16:10:37 2012 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,231 +0,0 @@ -/* - * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. Oracle designates this - * particular file as subject to the "Classpath" exception as provided - * by Oracle in the LICENSE file that accompanied this code. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA - * or visit www.oracle.com if you need additional information or have any - * questions. - */ -package java.util.streams; - -import java.util.*; -import java.util.functions.*; -import java.util.streams.ops.*; - -/** - * SequentialMapPipeline - * - * @param <U> Type of elements in the upstream. - * @param <K> Type of mapping key. - * @param <V> Type of mapping value. - * - * @author Brian Goetz - */ -public class SequentialMapPipeline<U, K, V> implements MapStream<K, V>, Traversable<Mapping<K,V>> { - /** - * Source of elements - */ - private final Traversable<U> upstream; - - /** - * Operation for this stage. - */ - private final ElementwiseOp<U, Mapping<K, V>> stage; - - /** - * If initialized then we are committed to pull iteration via iterator. - */ - private MapIterator<K,V> iterator; - - public SequentialMapPipeline(Traversable<U> upstream, ElementwiseOp<U, Mapping<K, V>> stage) { - this.upstream = Objects.requireNonNull(upstream); - this.stage = Objects.requireNonNull(stage); - } - - private<KK,VV> SequentialMapPipeline<U,KK,VV> replaceWith(ElementwiseOp<U, Mapping<KK,VV>> newStage) { - return new SequentialMapPipeline<>(upstream, newStage); - } - - private<KK,VV> SequentialMapPipeline<Mapping<K,V>,KK,VV> chainWith(ElementwiseOp<Mapping<K,V>, Mapping<KK,VV>> newStage) { - return new SequentialMapPipeline<>(this, newStage); - } - - @Override - public MapIterator<K,V> iterator() { - if (iterator == null) { - iterator = new MapIterator.IteratorAdapter<>(stage.iterator(upstream.iterator())); - } - return iterator; - } - - @Override - public boolean hasNext() { - return iterator().hasNext(); - } - - @Override - public Mapping<K, V> next() { - return iterator().next(); - } - - @Override - public K nextKey() { - return iterator().nextKey(); - } - - @Override - public V nextValue() { - return iterator().nextValue(); - } - - @Override - public K curKey() { - return iterator().curKey(); - } - - @Override - public V curValue() { - return iterator().curValue(); - } - - private void forEachInto(Sink<U> wrappedSink) { - if (stage.isStateful()) { - StatefulSink<U, Void> statefulSink = (StatefulSink<U, Void>) wrappedSink; - statefulSink.begin(-1); - upstream.forEach(wrappedSink); - statefulSink.end(); - } else { - upstream.forEach(wrappedSink); - } - } - - @Override - public void forEach(BiSink<K, V> sink) { - if (iterator == null) { - forEachInto(stage.sink(sink)); - } else { - for (MapIterator<K,V> each = iterator(); each.hasNext(); ) { - K key = each.nextKey(); - V value = each.curValue(); - sink.accept(key, value); - } - } - } - - @Override - public void forEach(Sink<? super Mapping<K, V>> sink) { - if (sink instanceof BiSink) { - forEach((BiSink<K,V>) (Sink) sink); - return; - } - if (iterator == null) { - forEachInto(stage.sink(sink)); - } else { - for (Iterator<Mapping<K,V>> each = iterator(); each.hasNext(); ) { - sink.accept(each.next()); - } - } - } - - public<U> U pipeline(ShortCircuitEagerOp<Mapping<K,V>, U> op) { - return op.evaluate(iterator()); - } - - @Override - public <A extends Map<K, V>> A into(A target) { - forEach(target::put); - return target; - } - - @Override - public Stream<K> keys() { - return new SequentialPipeline<>(this, new MapExtractKeysOp<K,V>()); - } - - @Override - public Stream<V> values() { - return new SequentialPipeline<>(this, new MapExtractValuesOp<K,V>()); - } - - @Override - public MapStream<K, V> filter(BiPredicate<? super K, ? super V> predicate) { - return chainWith(new BiFilterOp<>(predicate)); - } - - @Override - public MapStream<K, V> filterKeys(Predicate<? super K> predicate) { - return chainWith(new MapFilterKeysOp<K,V>(predicate)); - } - - @Override - public MapStream<K, V> filterValues(Predicate<? super V> predicate) { - return chainWith(new MapFilterValuesOp<K,V>(predicate)); - } - - @Override - public <U> MapStream<K, U> mapValues(BiMapper<? super K, ? super V, ? extends U> mapper) { - return chainWith(new BiMapOp<>(mapper)); - } - - @Override - public <U> MapStream<K, U> mapValues(Mapper<? super V, ? extends U> mapper) { - return chainWith(new MapMapValuesOp<K,V,U>(mapper)); - } - - @Override - public MapStream<V, K> swap() { - return chainWith(MapSwapOp.<K,V>singleton()); - } - - @Override - public MapStream<K, V> sorted(Comparator<? super K> comparator) { - return chainWith(new MapSortedOp(comparator)); - } - - @Override - public MapStream<K, V> mergeWith(MapStream<K, ? extends V> other) { - // !?! mduigou I don't know what semantic was intended. - throw new UnsupportedOperationException("nyi"); - } - - @Override - public boolean noneMatch(BiPredicate<? super K, ? super V> predicate) { - return allMatch(predicate.negate()); - } - - @Override - public boolean allMatch(BiPredicate<? super K, ? super V> predicate) { - return pipeline(new BiAllMatchOp<K,V>(predicate)); - } - - @Override - public boolean anyMatch(BiPredicate<? super K, ? super V> predicate) { - return pipeline(new BiAnyMatchOp<K,V>(predicate)); - } - - @Override - public Optional<Mapping<K, V>> findAny() { - return pipeline(new FindAnyOp<Mapping<K,V>>()); - } - - @Override - public Optional<Mapping<K, V>> findFirst() { - return pipeline(new FindFirstOp<Mapping<K,V>>()); - } -}
--- a/src/share/classes/java/util/streams/SequentialPipeline.java Mon Aug 20 16:10:37 2012 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,239 +0,0 @@ -/* - * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. Oracle designates this - * particular file as subject to the "Classpath" exception as provided - * by Oracle in the LICENSE file that accompanied this code. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA - * or visit www.oracle.com if you need additional information or have any - * questions. - */ -package java.util.streams; - -import java.util.*; -import java.util.functions.*; -import java.util.streams.ops.*; - -/** - * SequentialPipeline - * - * @param <T> Type of elements in the upstream source. - * @param <U> Type of elements in produced by this stage. - * - * @author Brian Goetz - */ -public class SequentialPipeline<T, U> implements Stream<U>, Traversable<U> { - /** - * Source for elements. - */ - private final Traversable<T> upstream; - - /** - * Operation for this stage. - */ - private final ElementwiseOp<T, U> stage; - - /** - * If initialized then we are committed to pull iteration via iterator. - */ - private Iterator<U> iterator; - - public SequentialPipeline(Traversable<T> upstream, ElementwiseOp<T,U> stage) { - this.upstream = Objects.requireNonNull(upstream); - this.stage = Objects.requireNonNull(stage); - } - - private<V> SequentialPipeline<T,V> replaceWith(ElementwiseOp<T,V> newStage) { - return new SequentialPipeline<>(upstream, newStage); - } - - private<V> SequentialPipeline<U,V> chainWith(ElementwiseOp<U,V> newStage) { - return new SequentialPipeline<>(this, newStage); - } - - public static<T> SequentialPipeline<T,T> wrap(Streamable<T> source) { - return new SequentialPipeline<>(source, new IdOp<T>()); - } - - @Override - public Iterator<U> iterator() { - if (iterator == null) { - iterator = stage.iterator(upstream.iterator()); - } - return iterator; - } - - // Calling the Iterator methods forces us into lazy mode - @Override - public boolean hasNext() { - return iterator().hasNext(); - } - - // Calling the Iterator methods forces us into lazy mode - @Override - public U next() { - return iterator().next(); - } - - @Override - public Stream<U> filter(Predicate<? super U> predicate) { - return chainWith(new FilterOp<>(predicate)); - } - - @Override - public <R> Stream<R> map(Mapper<? super U, ? extends R> mapper) { - return chainWith(new MapOp<>(mapper)); - } - - @Override - public <R> Stream<R> flatMap(FlatMapper<? super U, R> mapper) { - return chainWith(new FlatMapOp<>(mapper)); - } - - @Override - public void forEach(Sink<? super U> sink) { - if (iterator == null) { - Sink<T> wrappedSink = stage.sink(sink); - if (stage.isStateful()) { - StatefulSink<U, Void> statefulSink = (StatefulSink<U, Void>) wrappedSink; - // @@@ supply size if known - statefulSink.begin(-1); - upstream.forEach(wrappedSink); - statefulSink.end(); - } else { - upstream.forEach(wrappedSink); - } - } else { - for (Iterator<U> each = iterator(); each.hasNext(); ) { - sink.accept(each.next()); - } - } - } - - @Override - public <A extends Fillable<? super U>> A into(A target) { - forEach(target::add); - return target; - } - - @Override - public <K> Map<K, SizedStreamable<U>> groupBy(Mapper<? super U, ? extends K> mapper) { - return pipeline(new GroupByOp<>(mapper)); - } - - @Override - public Object[] toArray() { - return pipeline(ToArrayOp.<U>singleton()); - } - - public<V> Stream<V> pipeline(StatelessOp<U, V> op) { - return chainWith(op); - } - - public<V> Stream<V> pipeline(StatefulOp<U, V, ?> op) { - return chainWith(op); - } - - public<V> V pipeline(EagerOp<U, V> op) { - StatefulSink<U, V> sink = op.sink(); - sink.begin(-1); // @@@ supply size if known - forEach(sink); - return sink.end(); - } - - public<V> V pipeline(ShortCircuitEagerOp<U, V> op) { - return op.evaluate(iterator()); - } - - public static<T,U> U eagerOp(Traversable<T> source, EagerOp<T,U> op) { - StatefulSink<T,U> sink = op.sink(); - sink.begin(-1); // @@@ supply size if known - source.forEach(sink); - return sink.end(); - } - - public static<T,U> U eagerOp(Iterable<T> source, ShortCircuitEagerOp<T,U> op) { - return op.evaluate(source.iterator()); - } - - @Override - public boolean anyMatch(Predicate<? super U> predicate) { - return pipeline(new AnyMatchOp<>(predicate)); - } - - @Override - public boolean allMatch(Predicate<? super U> predicate) { - return pipeline(new AllMatchOp<>(predicate)); - } - - @Override - public boolean noneMatch(Predicate<? super U> predicate) { - return pipeline(new NoneMatchOp<>(predicate)); - } - - @Override - public Optional<U> findFirst() { - return pipeline(new FindFirstOp<U>()); - } - - @Override - public Optional<U> findAny() { - return pipeline(new FindAnyOp<U>()); - } - - @Override - public <R> MapStream<U, R> mapped(Mapper<? super U, ? extends R> mapper) { - return new SequentialMapPipeline<>(this, new MappedOp<>(mapper)); - } - - @Override - public Stream<U> uniqueElements() { - return pipeline(new UniqOp<U>()); - } - - @Override - public Stream<U> sorted(Comparator<? super U> comparator) { - return pipeline(new SortedOp<>(comparator)); - } - - @Override - public Stream<U> cumulate(BinaryOperator<U> operator) { - return pipeline(new CumulateOp<>(operator)); - } - - @Override - public U reduce(final U seed, final BinaryOperator<U> op) { - // @@@ Temporary hack to work around compiler error - Factory<U> seedFactory = new Factory<U>() { - @Override - public U make() { - return seed; - } - }; - return pipeline(new FoldOp<>(seedFactory, op, op)); - } - - @Override - public Optional<U> reduce(BinaryOperator<U> op) { - return pipeline(new SeedlessFoldOp<U>(op)); - } - - @Override - public <V> V fold(Factory<V> seedFactory, Combiner<V, U, V> reducer, BinaryOperator<V> combiner) { - return pipeline(new FoldOp<>(seedFactory, reducer, combiner)); - } -}
--- a/src/share/classes/java/util/streams/SizedStreamable.java Mon Aug 20 16:10:37 2012 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,37 +0,0 @@ -/* - * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. Oracle designates this - * particular file as subject to the "Classpath" exception as provided - * by Oracle in the LICENSE file that accompanied this code. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA - * or visit www.oracle.com if you need additional information or have any - * questions. - */ -package java.util.streams; - -import java.util.Sized; - -/** - * A Streamable that is also Sized. - * - * @param <T> Type of stream elements. - * - * @author Brian Goetz - */ -public interface SizedStreamable<T> extends Streamable<T>, Sized { -}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/share/classes/java/util/streams/StatefulBiSink.java Sat Aug 25 19:03:55 2012 -0400 @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ +package java.util.streams; + +import java.util.Mapping; +import java.util.functions.BiSink; + +/** + * StatefulBiSink + * + * @author Brian Goetz + */ +public interface StatefulBiSink<K,V,R> extends BiSink<K,V>, StatefulSink<Mapping<K,V>,R> { +}
--- a/src/share/classes/java/util/streams/Stream.java Mon Aug 20 16:10:37 2012 -0400 +++ b/src/share/classes/java/util/streams/Stream.java Sat Aug 25 19:03:55 2012 -0400 @@ -24,7 +24,8 @@ */ package java.util.streams; -import java.util.Iterator; +import java.util.*; +import java.util.functions.*; /** * A source of elements which may be iterated or mutated. @@ -33,7 +34,9 @@ * * @author Brian Goetz */ -public interface Stream<T> extends StreamOps<T>, Iterator<T> { +public interface Stream<T> { + public enum Shape { LINEAR, KEY_VALUE } + /** * Stream elements are unique. No two elements contained in the stream * are equivalent via {@code equals()} @@ -60,4 +63,45 @@ * Mask of undefined state bits. */ public static final int STATE_UNKNOWN_MASK_V1 = ~(STATE_UNIQUE | STATE_SORTED | STATE_SIZED); + + Iterator<T> iterator(); + + + Stream<T> filter(Predicate<? super T> predicate); + + <R> Stream<R> map(Mapper<? super T, ? extends R> mapper); + + <R> Stream<R> flatMap(FlatMapper<? super T, R> mapper); + + Stream<T> uniqueElements(); + + Stream<T> sorted(Comparator<? super T> comparator); + + Stream<T> cumulate(BinaryOperator<T> operator); + + void forEach(Sink<? super T> sink); + + <A extends Fillable<? super T>> A into(A target); + + Object[] toArray(); + + <U> Map<U, Streamable<T>> groupBy(Mapper<? super T, ? extends U> mapper); + + T reduce(T base, BinaryOperator<T> op);Optional<T> reduce(BinaryOperator<T> op); + + <U> U fold(Factory<U> seedFactory, Combiner<U, T, U> reducer, BinaryOperator<U> combiner); + + boolean anyMatch(Predicate<? super T> predicate); + + boolean allMatch(Predicate<? super T> predicate); + + boolean noneMatch(Predicate<? super T> predicate); + + Optional<T> findFirst(); + + Optional<T> findAny(); + + Stream<T> sequential(); + + <U> MapStream<T, U> mapped(Mapper<? super T, ? extends U> mapper); }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/share/classes/java/util/streams/StreamAccessor.java Sat Aug 25 19:03:55 2012 -0400 @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ +package java.util.streams; + +import java.util.Iterator; +import java.util.Spliterator; +import java.util.functions.Sink; + +/** + * StreamAccessor + * + * @author Brian Goetz + */ +public interface StreamAccessor<T> { + void forEach(Sink<? super T> sink); + + Iterator<T> iterator(); + + public int getStreamFlags(); + + public int getSizeOrEstimate(); + + boolean isParallel(); + + boolean isPredictableSplits(); + + Stream.Shape getShape(); + + Spliterator<T> spliterator(); +}
--- a/src/share/classes/java/util/streams/StreamBuilder.java Mon Aug 20 16:10:37 2012 -0400 +++ b/src/share/classes/java/util/streams/StreamBuilder.java Sat Aug 25 19:03:55 2012 -0400 @@ -32,15 +32,14 @@ * * @author Brian Goetz */ -public interface StreamBuilder<T> extends SizedStreamable<T>, Sink<T> /* , Fillable<T> */ { +public interface StreamBuilder<T> extends Sized, Streamable<T>, Traversable<T>, Sink<T> /* , Fillable<T> */ { @Override - int getStreamState(); + int getStreamFlags(); @Override void forEach(Sink<? super T> sink); void clear(); - @Override Object[] toArray(); }
--- a/src/share/classes/java/util/streams/StreamBuilders.java Mon Aug 20 16:10:37 2012 -0400 +++ b/src/share/classes/java/util/streams/StreamBuilders.java Sat Aug 25 19:03:55 2012 -0400 @@ -28,7 +28,6 @@ import java.util.Arrays; import java.util.Iterator; import java.util.List; -import java.util.Objects; import java.util.functions.Sink; /** @@ -52,30 +51,38 @@ return new VariableStreamBuilder<>(initialSize); } + private static boolean equals(StreamBuilder a, StreamBuilder b) { + if (a.size() != b.size()) + return false; + Iterator it1 = a.iterator(); + Iterator it2 = b.iterator(); + while (it1.hasNext()) { + Object x1 = it1.next(); + Object x2 = it2.next(); + if (!x1.equals(x2)) + return false; + } + return true; + } + + private static<T> int hashCode(StreamBuilder<T> sb) { + int h = 0; + for (T t : sb) { + h = 31 * h + t.hashCode(); + } + return h; + } + private static class FixedStreamBuilder<T> implements StreamBuilder<T> { - private final Object[] array; + private final T[] array; private int curSize; public FixedStreamBuilder(int size) { - array = new Object[size]; + array = (T[]) new Object[size]; } @Override - public boolean equals(Object obj) { - if(obj instanceof Streamable) { - return Streams.equals(this, (Streamable<?>) obj); - } - - return false; - } - - @Override - public int hashCode() { - return Streams.hashCode(this); - } - - @Override - public int getStreamState() { + public int getStreamFlags() { return Stream.STATE_SIZED; } @@ -91,13 +98,18 @@ @Override public void forEach(Sink<? super T> sink) { for (int i=0; i<curSize; i++) { - sink.accept((T) array[i]); + sink.accept(array[i]); } } @Override + public Stream<T> stream() { + return Streams.stream(this, size()); + } + + @Override public Iterator<T> iterator() { - return (Iterator<T>) Arrays.iterator( array, 0, curSize); + return Arrays.iterator(array, 0, curSize); } @Override @@ -116,9 +128,27 @@ return array; } else { + // @@@ Should this throw ISE instead? return Arrays.copyOf(array, curSize); } } + + @Override + public int hashCode() { + return StreamBuilders.hashCode(this); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof StreamBuilder)) + return false; + return StreamBuilders.equals(this, (StreamBuilder) obj); + } + + @Override + public String toString() { + return String.format("FixedStreamBuilder[%d][%s]", array.length-curSize, Arrays.toString(array)); + } } private static class VariableStreamBuilder<T> implements StreamBuilder<T> { @@ -134,21 +164,7 @@ } @Override - public boolean equals(Object obj) { - if(obj instanceof Streamable) { - return Streams.equals(this, (Streamable<?>) obj); - } - - return false; - } - - @Override - public int hashCode() { - return Streams.hashCode(this); - } - - @Override - public int getStreamState() { + public int getStreamFlags() { return Stream.STATE_SIZED; } @@ -163,6 +179,11 @@ } @Override + public Stream<T> stream() { + return Streams.stream(this, size()); + } + + @Override public void forEach(Sink<? super T> sink) { list.forEach(sink); } @@ -181,6 +202,23 @@ public Object[] toArray() { return list.toArray(); } + + @Override + public int hashCode() { + return StreamBuilders.hashCode(this); + } + + @Override + public String toString() { + return String.format("VariableStreamBuilder[%s]", list); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof StreamBuilder)) + return false; + return StreamBuilders.equals(this, (StreamBuilder) obj); + } } }
--- a/src/share/classes/java/util/streams/StreamOps.java Mon Aug 20 16:10:37 2012 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,80 +0,0 @@ -/* - * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. Oracle designates this - * particular file as subject to the "Classpath" exception as provided - * by Oracle in the LICENSE file that accompanied this code. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA - * or visit www.oracle.com if you need additional information or have any - * questions. - */ -package java.util.streams; - -import java.util.Comparator; -import java.util.Fillable; -import java.util.Map; -import java.util.Optional; -import java.util.functions.*; - -/** - * StreamOps - * - * @params <T> Type of elements. - * - * @author Brian Goetz - */ -public interface StreamOps<T> { - Stream<T> filter(Predicate<? super T> predicate); - - <R> Stream<R> map(Mapper<? super T, ? extends R> mapper); - - <R> Stream<R> flatMap(FlatMapper<? super T, R> mapper); - - Stream<T> uniqueElements(); - - Stream<T> sorted(Comparator<? super T> comparator); - - Stream<T> cumulate(BinaryOperator<T> operator); - - void forEach(Sink<? super T> sink); - - <A extends Fillable<? super T>> A into(A target); - - Object[] toArray(); - - <U> Map<U, SizedStreamable<T>> groupBy(Mapper<? super T, ? extends U> mapper); - -// <U> Map<U, StreamSource<T>> groupByMulti(FlatMapper<? super T, ? extends U> mapper); - - T reduce(T base, BinaryOperator<T> op); - - Optional<T> reduce(BinaryOperator<T> op); - - <U> U fold(Factory<U> seedFactory, Combiner<U, T, U> reducer, BinaryOperator<U> combiner); - - boolean anyMatch(Predicate<? super T> predicate); - - boolean allMatch(Predicate<? super T> predicate); - - boolean noneMatch(Predicate<? super T> predicate); - - Optional<T> findFirst(); - - Optional<T> findAny(); - - <U> MapStream<T, U> mapped(Mapper<? super T, ? extends U> mapper); -}
--- a/src/share/classes/java/util/streams/Streamable.java Mon Aug 20 16:10:37 2012 -0400 +++ b/src/share/classes/java/util/streams/Streamable.java Sat Aug 25 19:03:55 2012 -0400 @@ -35,119 +35,10 @@ * * @author Brian Goetz */ -public interface Streamable<T> extends StreamOps<T>, Traversable<T> { - Stream<T> stream() default { - throw new UnsupportedOperationException("nyi"); - } +public interface Streamable<T> { + Stream<T> stream(); - public int getStreamState() default { + public int getStreamFlags() default { return 0; } - - @Override - void forEach(Sink<? super T> sink) default { - for (T t: this) { - sink.accept(t); - } - } - - @Override - <A extends Fillable<? super T>> A into(A target) default { - for (T t : this) { - target.add(t); - } - return target; - } - - @Override - Object[] toArray() default { - return SequentialPipeline.eagerOp(this, ToArrayOp.<T>singleton()); - } - - @Override - <K> Map<K, SizedStreamable<T>> groupBy(Mapper<? super T, ? extends K> mapper) default { - return SequentialPipeline.eagerOp(this, new GroupByOp<>(mapper)); - } - - @Override - Stream<T> filter(Predicate<? super T> predicate) default { - return new SequentialPipeline<>(this, new FilterOp<>(predicate)); - } - - @Override - <R> Stream<R> map(Mapper<? super T, ? extends R> mapper) default { - return new SequentialPipeline<>(this, new MapOp<>(mapper)); - } - - @Override - <R> Stream<R> flatMap(FlatMapper<? super T, R> mapper) default { - return new SequentialPipeline<>(this, new FlatMapOp<>(mapper)); - } - - @Override - Stream<T> uniqueElements() default { - return new SequentialPipeline<>(this, new UniqOp<T>()); - } - - @Override - Stream<T> sorted(Comparator<? super T> comparator) default { - return new SequentialPipeline<>(this, new SortedOp<>(comparator)); - } - - @Override - Stream<T> cumulate(BinaryOperator<T> operator) default { - return new SequentialPipeline<>(this, new CumulateOp<>(operator)); - } - - @Override - public T reduce(final T base, BinaryOperator<T> op) default { - // @@@ Temporary hack to work around compiler error - Factory<T> seedFactory = new Factory<T>() { - @Override - public T make() { - return base; - } - }; - return fold(seedFactory, op, op); - } - - @Override - public Optional<T> reduce(BinaryOperator<T> op) default { - return SequentialPipeline.eagerOp(this, new SeedlessFoldOp<>(op)); - } - - @Override - public <U> U fold(Factory<U> seedFactory, Combiner<U, T, U> reducer, BinaryOperator<U> combiner) default { - return SequentialPipeline.eagerOp(this, new FoldOp<>(seedFactory, reducer, combiner)); - } - - @Override - Optional<T> findFirst() default { - return SequentialPipeline.eagerOp(this, new FindFirstOp<T>()); - } - - @Override - <R> MapStream<T, R> mapped(Mapper<? super T, ? extends R> mapper) default { - return new SequentialMapPipeline<>(this, new MappedOp<>(mapper)); - } - - @Override - Optional<T> findAny() default { - return SequentialPipeline.eagerOp(this, new FindAnyOp<T>()); - } - - @Override - boolean anyMatch(Predicate<? super T> predicate) default { - return SequentialPipeline.eagerOp(this, new AnyMatchOp<>(predicate)); - } - - @Override - boolean allMatch(Predicate<? super T> predicate) default { - return SequentialPipeline.eagerOp(this, new AllMatchOp<>(predicate)); - } - - @Override - boolean noneMatch(Predicate<? super T> predicate) default { - return SequentialPipeline.eagerOp(this, new NoneMatchOp<>(predicate)); - } }
--- a/src/share/classes/java/util/streams/Streams.java Mon Aug 20 16:10:37 2012 -0400 +++ b/src/share/classes/java/util/streams/Streams.java Sat Aug 25 19:03:55 2012 -0400 @@ -37,63 +37,44 @@ throw new Error("no instances"); } - public static<T, U extends Iterator<T>> Streamable<T> asStreamable(final U iterator) { - return new Streamable<T>() { - @Override - public Iterator<T> iterator() { - return iterator; - } - }; + public static<T> Stream<T> stream(Collection<T> source) { + return new LinearPipeline<>(new TraversableStreamAccessor<>(source, source.size())); } - public static<T, U extends Iterable<T> & Sized> SizedStreamable<T> concatenate(final U left, final U right) { - return new SizedStreamable<T>() { - int size = 0; - - @Override - public int size() { - if (size == 0) { - size = left.size() + right.size(); - } - return size; - } - - @Override - public Iterator<T> iterator() { - return concatenate(left.iterator(), right.iterator()); - } - }; + public static<T> Stream<T> stream(Traversable<T> source) { + return new LinearPipeline<>(new TraversableStreamAccessor<>(source)); } - public static<T> Iterator<T> concatenate(final Iterator<? extends T> left, final Iterator<? extends T> right) { - return new Iterator<T>() { - private Iterator<? extends T> it = left; - private boolean switched = false; + public static<T> Stream<T> stream(Traversable<T> source, int knownSize) { + return new LinearPipeline<>(new TraversableStreamAccessor<>(source, knownSize)); + } - @Override - public boolean hasNext() { - if (it.hasNext()) { - return true; - } else if (switched) { - return false; - } else { - switched = true; - it = right; - return it.hasNext(); - } - } + public static<T> Stream<T> stream(Iterable<T> source) { + return stream(source.iterator()); + } - @Override - public T next() { - if (hasNext()) { - return it.next(); - } else { - throw new NoSuchElementException(); - } - } - }; + public static<T> Stream<T> stream(Iterable<T> source, int knownSize) { + return stream(source.iterator(), knownSize); } + public static<T> Stream<T> stream(Iterator<T> source) { + return new LinearPipeline<>(new IteratorStreamAccessor<>(source)); + } + + public static<T> Stream<T> stream(Iterator<T> source, int knownSize) { + return new LinearPipeline<>(new IteratorStreamAccessor<>(source, knownSize)); + } + + public static<T> Stream<T> parallel(Spliterator<T> source) { + return new LinearPipeline<>(new SpliteratorStreamAccessor<>(source)); + } + + public static<T> Stream<T> parallel(Spliterator<T> source, int knownSize) { + return new LinearPipeline<>(new SpliteratorStreamAccessor<>(source, knownSize)); + } + + // @@@ Need from(StreamAccessor) methods + public static<T> Spliterator<T> emptySpliterator() { return new Spliterator<T>() { @Override @@ -126,50 +107,153 @@ }; } - public static<T, U extends Traversable<T> & Sized> Spliterator<T> spliterator(U source) { - return new SizedStreamableSpliterator<>(source); - } + private static class IteratorStreamAccessor<T> + extends AbstractSequentialStreamAccessor<T> + implements StreamAccessor<T>, Iterator<T> { + private final Iterator<T> it; + private final int size; - private static class SizedStreamableSpliterator<T, U extends Traversable<T> & Sized> implements Spliterator<T> { - private final U source; - private Iterator<T> iterator; + public IteratorStreamAccessor(Iterator<T> it) { + this.it = it; + this.size = -1; + } - private SizedStreamableSpliterator(U source) { - this.source = source; + public IteratorStreamAccessor(Iterator<T> it, int knownSize) { + this.it = it; + this.size = knownSize; } @Override - public Spliterator<T> split() { - return emptySpliterator(); + public void forEach(Sink<? super T> sink) { + while (it.hasNext()) + sink.accept(it.next()); } - private Iterator<T> iterator() { - if (iterator == null) { - iterator = source.iterator(); - } + @Override + public T next() { + return it.next(); + } + + @Override + public boolean hasNext() { + return it.hasNext(); + } + + @Override + public Iterator<T> iterator() { + return it; + } + + @Override + public int getStreamFlags() { + int flags = super.getStreamFlags(); + if (size >= 0) + flags |= Stream.STATE_SIZED; + return flags; + } + + @Override + public int getSizeOrEstimate() { + return (size >= 0) ? size : super.getSizeOrEstimate(); + } + } + + static class SpliteratorStreamAccessor<T> implements StreamAccessor<T> { + private final Spliterator<T> spliterator; + private final int size; + + public SpliteratorStreamAccessor(Spliterator<T> spliterator) { + this.spliterator = spliterator; + this.size = -1; + } + + public SpliteratorStreamAccessor(Spliterator<T> spliterator, int knownSize) { + this.spliterator = spliterator; + this.size = knownSize; + } + + @Override + public void forEach(Sink<? super T> sink) { + spliterator.forEach(sink); + } + + @Override + public int getStreamFlags() { + int flags = 0; + if (size >= 0) + flags |= Stream.STATE_SIZED; + return flags; + } + + @Override + public int getSizeOrEstimate() { + return (size >= 0) ? size : -Integer.MAX_VALUE; + } + + @Override + public Iterator<T> iterator() { + return spliterator; + } + + @Override + public Spliterator<T> spliterator() { + return spliterator; + } + + @Override + public Stream.Shape getShape() { + return Stream.Shape.LINEAR; + } + + @Override + public boolean isPredictableSplits() { + return spliterator.isPredictableSplits(); + } + + @Override + public boolean isParallel() { + return true; + } + } + + private static class TraversableStreamAccessor<T> + extends AbstractSequentialStreamAccessor<T> + implements StreamAccessor<T>, Iterator<T> { + private final Traversable<T> traversable; + private final int size; + Iterator<T> iterator = null; + + TraversableStreamAccessor(Traversable<T> traversable) { + this.traversable = traversable; + this.size = -1; + } + + TraversableStreamAccessor(Traversable<T> traversable, int knownSize) { + this.traversable = traversable; + this.size = knownSize; + } + + public Iterator<T> iterator() { + if (iterator == null) + iterator = traversable.iterator(); return iterator; } @Override public void forEach(Sink<? super T> sink) { if (iterator == null) { - source.forEach(sink); + traversable.forEach(sink); iterator = Collections.emptyIterator(); - } else { - while (iterator.hasNext()) { + } + else { + while (iterator.hasNext()) sink.accept(iterator.next()); - } } } @Override - public int getRemainingSizeIfKnown() { - return iterator == null ? source.size() : -1; - } - - @Override - public boolean isPredictableSplits() { - return true; + public T next() { + return iterator().next(); } @Override @@ -178,39 +262,16 @@ } @Override - public T next() { - return iterator().next(); + public int getStreamFlags() { + int flags = super.getStreamFlags(); + if (size >= 0) + flags |= Stream.STATE_SIZED; + return flags; + } + + @Override + public int getSizeOrEstimate() { + return (size >= 0) ? size : super.getSizeOrEstimate(); } } - - public static boolean equals(Streamable<?> one, Streamable<?> two) { - Iterator<?> each = two.iterator(); - for (Object first : one) { - if (!each.hasNext()) { - return false; - } - - Object second = each.next(); - - if (!Objects.equals(first, second)) { - return false; - } - } - - if (each.hasNext()) { - return false; - } - - return true; - } - - public static int hashCode(Streamable<?> streamable) { - int result = -1; - - for(Object each : streamable) { - result = 17 * (result ^ Objects.hashCode(each)); - } - - return result; - } }
--- a/src/share/classes/java/util/streams/ops/AllMatchOp.java Mon Aug 20 16:10:37 2012 -0400 +++ b/src/share/classes/java/util/streams/ops/AllMatchOp.java Sat Aug 25 19:03:55 2012 -0400 @@ -27,14 +27,13 @@ import java.util.Iterator; import java.util.Objects; import java.util.functions.Predicate; -import java.util.streams.ParallelStreamable; /** * AllMatchOp * * @author Brian Goetz */ -public class AllMatchOp<T> implements ShortCircuitEagerOp<T, Boolean> { +public class AllMatchOp<T> implements ShortCircuitTerminalOp<T, Boolean> { private final Predicate<? super T> predicate; public AllMatchOp(Predicate<? super T> predicate) { @@ -57,7 +56,7 @@ } @Override - public <V> Boolean computeParallel(ParallelStreamable<V> source, ParallelOpHelper<T, V> helper) { + public <V> Boolean computeParallel(ParallelOpHelper<T, V> helper) { throw new UnsupportedOperationException("nyi"); } }
--- a/src/share/classes/java/util/streams/ops/AnyMatchOp.java Mon Aug 20 16:10:37 2012 -0400 +++ b/src/share/classes/java/util/streams/ops/AnyMatchOp.java Sat Aug 25 19:03:55 2012 -0400 @@ -27,7 +27,6 @@ import java.util.Iterator; import java.util.Objects; import java.util.functions.Predicate; -import java.util.streams.ParallelStreamable; /** * AnyMatchOp @@ -36,7 +35,7 @@ * * @author Brian Goetz */ -public class AnyMatchOp<T> implements ShortCircuitEagerOp<T, Boolean> { +public class AnyMatchOp<T> implements ShortCircuitTerminalOp<T, Boolean> { private final Predicate<? super T> predicate; public AnyMatchOp(Predicate<? super T> predicate) { @@ -59,7 +58,7 @@ } @Override - public <V> Boolean computeParallel(ParallelStreamable<V> source, ParallelOpHelper<T, V> helper) { + public <V> Boolean computeParallel(ParallelOpHelper<T, V> helper) { throw new UnsupportedOperationException("nyi"); } }
--- a/src/share/classes/java/util/streams/ops/BiAllMatchOp.java Mon Aug 20 16:10:37 2012 -0400 +++ b/src/share/classes/java/util/streams/ops/BiAllMatchOp.java Sat Aug 25 19:03:55 2012 -0400 @@ -29,13 +29,12 @@ import java.util.Mapping; import java.util.Objects; import java.util.functions.BiPredicate; -import java.util.streams.ParallelStreamable; /** * BiAllMatchOp * */ -public class BiAllMatchOp<K,V> implements ShortCircuitEagerOp<Mapping<K,V>, Boolean> { +public class BiAllMatchOp<K,V> implements ShortCircuitTerminalOp<Mapping<K,V>, Boolean> { private final BiPredicate<? super K, ? super V> predicate; public BiAllMatchOp(BiPredicate<? super K, ? super V> predicate) {
--- a/src/share/classes/java/util/streams/ops/BiAnyMatchOp.java Mon Aug 20 16:10:37 2012 -0400 +++ b/src/share/classes/java/util/streams/ops/BiAnyMatchOp.java Sat Aug 25 19:03:55 2012 -0400 @@ -29,13 +29,12 @@ import java.util.Mapping; import java.util.Objects; import java.util.functions.BiPredicate; -import java.util.streams.ParallelStreamable; /** * BiAnyMatchOp * */ -public class BiAnyMatchOp<K,V> implements ShortCircuitEagerOp<Mapping<K,V>, Boolean> { +public class BiAnyMatchOp<K,V> implements ShortCircuitTerminalOp<Mapping<K,V>, Boolean> { private final BiPredicate<? super K, ? super V> predicate; public BiAnyMatchOp(BiPredicate<? super K, ? super V> predicate) {
--- a/src/share/classes/java/util/streams/ops/BiFilterOp.java Mon Aug 20 16:10:37 2012 -0400 +++ b/src/share/classes/java/util/streams/ops/BiFilterOp.java Sat Aug 25 19:03:55 2012 -0400 @@ -47,7 +47,7 @@ } @Override - public int getStreamState(int upstreamState) { + public int getStreamFlags(int upstreamState) { return upstreamState & ~(Stream.STATE_SIZED | Stream.STATE_UNKNOWN_MASK_V1); } @@ -64,9 +64,9 @@ @Override public Sink<Mapping<K,V>> sink(final Sink<? super Mapping<K,V>> sink) { if (!(sink instanceof BiSink)) { - throw new IllegalStateException("Expecting BiSink"); + throw new IllegalArgumentException(String.format("Expected argument sink to be of type BiSync; found %s", sink.getClass())); } - final BiSink<K, V> biSink = (BiSink<K, V>) (Sink) Objects.requireNonNull(sink); + final BiSink<K, V> biSink = (BiSink<K, V>) (BiSink) Objects.requireNonNull(sink); return new BiSink<K,V>() { @Override @@ -84,7 +84,7 @@ }; } - public static <K, V> MapIterator<K, V> iterator(Iterator<? super Mapping<K, V>> source, BiPredicate<? super K, ? super V> predicate) { + public static <K, V> MapIterator<K, V> iterator(Iterator<Mapping<K, V>> source, BiPredicate<? super K, ? super V> predicate) { return new FilteredIterator<>(source, predicate); } @@ -94,7 +94,7 @@ private boolean nextReady = false; - FilteredIterator(Iterator<? super Mapping<K, V>> source, BiPredicate<? super K, ? super V> predicate) { + FilteredIterator(Iterator<Mapping<K, V>> source, BiPredicate<? super K, ? super V> predicate) { super(source); this.predicate = Objects.requireNonNull(predicate); } @@ -102,7 +102,7 @@ @Override public boolean hasNext() { while (!nextReady && source.hasNext()) { - current = (Mapping<K, V>)source.next(); + current = source.next(); nextReady = predicate.eval(current.getKey(), current.getValue()); }
--- a/src/share/classes/java/util/streams/ops/BiMapOp.java Mon Aug 20 16:10:37 2012 -0400 +++ b/src/share/classes/java/util/streams/ops/BiMapOp.java Sat Aug 25 19:03:55 2012 -0400 @@ -52,7 +52,7 @@ } @Override - public int getStreamState(int upstreamState) { + public int getStreamFlags(int upstreamState) { return upstreamState & ~(Stream.STATE_SORTED | Stream.STATE_UNIQUE | Stream.STATE_UNKNOWN_MASK_V1); } @@ -67,7 +67,7 @@ } @Override - public Sink<Mapping<K,V>> sink(final Sink<? super Mapping<K,U>> sink) { + public BiSink<K,V> sink(final Sink<? super Mapping<K,U>> sink) { if (!(sink instanceof BiSink)) { throw new IllegalStateException("Expecting BiSink"); }
--- a/src/share/classes/java/util/streams/ops/CumulateOp.java Mon Aug 20 16:10:37 2012 -0400 +++ b/src/share/classes/java/util/streams/ops/CumulateOp.java Sat Aug 25 19:03:55 2012 -0400 @@ -46,7 +46,7 @@ } @Override - public int getStreamState(int upstreamState) { + public int getStreamFlags(int upstreamState) { return upstreamState & ~(Stream.STATE_SORTED | Stream.STATE_UNIQUE | Stream.STATE_UNKNOWN_MASK_V1); } @@ -110,25 +110,8 @@ } @Override - public <V> ParallelStreamable<T> computeParallel(ParallelStreamable<V> source, ParallelOpHelper<T, V> helper) { - ForkJoinTask<Node<T>> task = new CumulateTask<>(helper.suggestDepth(source), source.spliterator(), op, helper); - final Node<T> node = helper.invoke(task); - return new ParallelStreamable<T>() { - @Override - public int size() { - return node.size(); - } - - @Override - public Iterator<T> iterator() { - return node.iterator(); - } - - @Override - public Spliterator<T> spliterator() { - return node.spliterator(); - } - }; + public <V> TreeUtils.Node<T> computeParallel(ParallelOpHelper<T, V> helper) { + return helper.invoke(new CumulateTask<>(helper.suggestDepth(), helper.spliterator(), op, helper)); } private static class Problem<T,V> { @@ -144,7 +127,7 @@ } } - private class CumulateTask<V> extends RecursiveTask<Node<T>> { + private class CumulateTask<V> extends RecursiveTask<TreeUtils.Node<T>> { private final Problem<T,V> problem; private final int depth; private final Spliterator<V> source; @@ -175,7 +158,7 @@ } @Override - protected Node<T> compute() { + protected TreeUtils.Node<T> compute() { switch (problem.pass) { case 0: if (depth != 0) { @@ -199,7 +182,7 @@ upward = sink.end(); // Special case -- if problem.depth == 0, just wrap the result and be done if (isRoot()) - return new LeafNode<>(Arrays.asList((T[]) leafData.toArray())); + return TreeUtils.node(leafData); } return null; @@ -216,9 +199,9 @@ right.downward = problem.op.operate(downward, left.upward); } right.fork(); - Node<T> leftResult = left.compute(); - Node<T> rightResult = right.join(); - return new InternalNode<>(leftResult, rightResult); + TreeUtils.Node<T> leftResult = left.compute(); + TreeUtils.Node<T> rightResult = right.join(); + return TreeUtils.node(leftResult, rightResult); } else { // @@@ Pretty inefficient; should update in place when we have a better StreamBuilder representation @@ -230,7 +213,7 @@ for (T t : leafData) result.accept(problem.op.operate(downward, t)); } - return new LeafNode<>(result); + return TreeUtils.node(result); } default: @@ -238,132 +221,4 @@ } } } - - private static interface Node<T> extends Traversable<T>, Splittable<T>, Sized { - } - - private static class LeafNode<T> implements Node<T> { - private final SizedStreamable<T> data; - int size = 0; - - private LeafNode(SizedStreamable<T> data) { - this.data = data; - } - - @Override - public int size() { - if (size == 0) - size = data.size(); - return size; - } - - @Override - public void forEach(Sink<? super T> sink) { - data.forEach(sink); - } - - @Override - public Iterator<T> iterator() { - return data.iterator(); - } - - @Override - public Spliterator<T> spliterator() { - return Streams.spliterator(data); - } - } - - private static class InternalNode<T> implements Node<T> { - private final Node<T> left, right; - int size = 0; - - private InternalNode(Node<T> left, Node<T> right) { - this.left = left; - this.right = right; - } - - @Override - public int size() { - if (size == 0) - size = left.size() + right.size(); - return size; - } - - @Override - public void forEach(Sink<? super T> sink) { - left.forEach(sink); - right.forEach(sink); - } - - @Override - public Iterator<T> iterator() { - return Streams.concatenate(left.iterator(), right.iterator()); - } - - @Override - public Spliterator<T> spliterator() { - return new InternalNodeSpliterator<>(this); - } - - private static class InternalNodeSpliterator<T> implements Spliterator<T> { - private Node<T> cur; - private Iterator<T> iterator; - - private InternalNodeSpliterator(InternalNode<T> cur) { - this.cur = cur; - } - - private Iterator<T> iterator() { - if (iterator == null) - iterator = cur.iterator(); - return iterator; - } - - @Override - public Spliterator<T> split() { - if (iterator != null) - throw new IllegalStateException(); - if (cur instanceof InternalNode) { - InternalNode<T> internalNode = (InternalNode<T>) cur; - Spliterator<T> ret = internalNode.left.spliterator(); - cur = internalNode.right; - return ret; - } - else - return Streams.emptySpliterator(); - } - - @Override - public void forEach(Sink<? super T> sink) { - if (iterator == null) { - cur.forEach(sink); - iterator = Collections.emptyIterator(); - } - else { - while (iterator.hasNext()) - sink.accept(iterator.next()); - } - } - - @Override - public int getRemainingSizeIfKnown() { - return (iterator == null) ? cur.size() : -1; - } - - @Override - public boolean isPredictableSplits() { - return true; - } - - @Override - public boolean hasNext() { - return iterator().hasNext(); - } - - @Override - public T next() { - return iterator().next(); - } - } - } }
--- a/src/share/classes/java/util/streams/ops/EagerOp.java Mon Aug 20 16:10:37 2012 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,50 +0,0 @@ -/* - * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. Oracle designates this - * particular file as subject to the "Classpath" exception as provided - * by Oracle in the LICENSE file that accompanied this code. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA - * or visit www.oracle.com if you need additional information or have any - * questions. - */ -package java.util.streams.ops; - -import java.util.Spliterator; -import java.util.functions.Sink; -import java.util.streams.ParallelStreamable; -import java.util.streams.StatefulSink; - -/** - * EagerOp - * - * @author Brian Goetz - */ -public interface EagerOp<T, U> extends ParallelOp<T, U> { - public StatefulSink<T, U> sink(); - - @Override - <V> U computeParallel(ParallelStreamable<V> source, ParallelOpHelper<T, V> helper) default { - // dumb default serial version - StatefulSink<T, U> sSink = sink(); - Sink<V> vSink = helper.sink(sSink); - Spliterator<V> spliterator = source.spliterator(); - sSink.begin(spliterator.getRemainingSizeIfKnown()); - spliterator.forEach(vSink); - return sSink.end(); - } -}
--- a/src/share/classes/java/util/streams/ops/ElementwiseOp.java Mon Aug 20 16:10:37 2012 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,75 +0,0 @@ -/* - * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. Oracle designates this - * particular file as subject to the "Classpath" exception as provided - * by Oracle in the LICENSE file that accompanied this code. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA - * or visit www.oracle.com if you need additional information or have any - * questions. - */ -package java.util.streams.ops; - -import java.util.Iterator; -import java.util.functions.Sink; - -/** - * An operation performed upon elements. - * - * @param <T> Type of input elements to the operation. - * @param <U> Type of output elements to the operation. - * - * @author Brian Goetz - */ -public interface ElementwiseOp<T,U> { - - /** - * Return the resultant state of a stream utilizing this operation - * considering the state of the provided stream. - * - * @param upstreamState stream state of the upstream stream. - * @return the resultant state of a stream utilizing this operation - * considering the state of the provided stream. - */ - int getStreamState(int upstreamState); - - /** - * If {@code true} then operation is stateful and accumulates state. - * - * @return {@code true} then operation is stateful and accumulates state. - */ - boolean isStateful(); - - /** - * Return an iterator of the elements of the stream. The operation will be - * performed upon each element as it is returned by {@code Iterator.next()}. - * - * @param in the source stream. - * @return an iterator of the elements of the stream. - */ - Iterator<U> iterator(Iterator<T> in); - - /** - * Return a sink which will accept elements, perform the operation upon - * each element and send it to the provided sink. - * - * @param sink elements will be sent to this sink after the processing. - * @return a sink which will accept elements and perform the operation upon - * each element. - */ - Sink<T> sink(Sink<? super U> sink); -}
--- a/src/share/classes/java/util/streams/ops/FilterOp.java Mon Aug 20 16:10:37 2012 -0400 +++ b/src/share/classes/java/util/streams/ops/FilterOp.java Sat Aug 25 19:03:55 2012 -0400 @@ -45,7 +45,7 @@ } @Override - public int getStreamState(int upstreamState) { + public int getStreamFlags(int upstreamState) { return upstreamState & ~(Stream.STATE_SIZED | Stream.STATE_UNKNOWN_MASK_V1); }
--- a/src/share/classes/java/util/streams/ops/FindAnyOp.java Mon Aug 20 16:10:37 2012 -0400 +++ b/src/share/classes/java/util/streams/ops/FindAnyOp.java Sat Aug 25 19:03:55 2012 -0400 @@ -26,14 +26,20 @@ import java.util.Iterator; import java.util.Optional; -import java.util.streams.ParallelStreamable; /** * FindAnyOp * * @author Brian Goetz */ -public class FindAnyOp<T> implements ShortCircuitEagerOp<T, Optional<T>> { +public class FindAnyOp<T> implements ShortCircuitTerminalOp<T, Optional<T>> { + private final static FindAnyOp<?> INSTANCE = new FindAnyOp<>(); + + @SuppressWarnings("unchecked") + public static <T> FindAnyOp<T> singleton() { + return (FindAnyOp<T>) INSTANCE; + } + @Override public Optional<T> evaluate(Iterator<T> iterator) { return iterator.hasNext() ? new Optional<>(iterator.next()) : Optional.<T>empty();
--- a/src/share/classes/java/util/streams/ops/FindFirstOp.java Mon Aug 20 16:10:37 2012 -0400 +++ b/src/share/classes/java/util/streams/ops/FindFirstOp.java Sat Aug 25 19:03:55 2012 -0400 @@ -26,14 +26,20 @@ import java.util.Iterator; import java.util.Optional; -import java.util.streams.ParallelStreamable; /** * FindFirstOp * * @author Brian Goetz */ -public class FindFirstOp<T> implements ShortCircuitEagerOp<T, Optional<T>> { +public class FindFirstOp<T> implements ShortCircuitTerminalOp<T, Optional<T>> { + private final static FindFirstOp<?> INSTANCE = new FindFirstOp<>(); + + @SuppressWarnings("unchecked") + public static <T> FindFirstOp<T> singleton() { + return (FindFirstOp<T>) INSTANCE; + } + @Override public Optional<T> evaluate(Iterator<T> iterator) { return iterator.hasNext() ? new Optional<>(iterator.next()) : Optional.<T>empty();
--- a/src/share/classes/java/util/streams/ops/FlatMapOp.java Mon Aug 20 16:10:37 2012 -0400 +++ b/src/share/classes/java/util/streams/ops/FlatMapOp.java Sat Aug 25 19:03:55 2012 -0400 @@ -47,7 +47,7 @@ } @Override - public int getStreamState(int upstreamState) { + public int getStreamFlags(int upstreamState) { return upstreamState & ~(Stream.STATE_SORTED | Stream.STATE_UNIQUE | Stream.STATE_SIZED | Stream.STATE_UNKNOWN_MASK_V1); }
--- a/src/share/classes/java/util/streams/ops/FoldOp.java Mon Aug 20 16:10:37 2012 -0400 +++ b/src/share/classes/java/util/streams/ops/FoldOp.java Sat Aug 25 19:03:55 2012 -0400 @@ -27,7 +27,6 @@ import java.util.Spliterator; import java.util.concurrent.RecursiveTask; import java.util.functions.*; -import java.util.streams.ParallelStreamable; import java.util.streams.StatefulSink; /** @@ -35,7 +34,7 @@ * * @author Brian Goetz */ -public class FoldOp<T, U> implements EagerOp<T, U> { +public class FoldOp<T, U> implements TerminalOp<T, U> { private final Factory<U> seedFactory; private final Combiner<U, T, U> reducer; private final BinaryOperator<U> combiner; @@ -79,8 +78,8 @@ } @Override - public <V> U computeParallel(ParallelStreamable<V> source, ParallelOpHelper<T, V> helper) { - return helper.invoke(new ReduceTask<>(helper.suggestDepth(source), source.spliterator(), this, helper)); + public <V> U computeParallel(ParallelOpHelper<T, V> helper) { + return helper.invoke(new ReduceTask<>(helper.suggestDepth(), helper.spliterator(), this, helper)); } private static class ReduceTask<T, U, V> extends RecursiveTask<U> {
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/share/classes/java/util/streams/ops/ForEachOp.java Sat Aug 25 19:03:55 2012 -0400 @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ +package java.util.streams.ops; + +import java.util.Spliterator; +import java.util.concurrent.RecursiveAction; +import java.util.functions.Sink; +import java.util.streams.StatefulSink; + +/** + * ForEachOp + * + * @author Brian Goetz + */ +public class ForEachOp<T> implements TerminalOp<T,Void> { + private final Sink<? super T> sink; + + public ForEachOp(Sink<? super T> sink) { + this.sink = sink; + } + + @Override + public StatefulSink<T, Void> sink() { + return (StatefulSink<T, Void>) sink.asStatefulSink(); + } + + @Override + public <V> Void computeParallel(ParallelOpHelper<T, V> helper) { + int depth = helper.suggestDepth(); + Sink<V> compoundSink = helper.sink(sink); + Spliterator<V> spliterator = helper.spliterator(); + if (depth == 0) { + spliterator.forEach(compoundSink); + } else { + helper.invoke(new ForEachTask<>(depth, spliterator, compoundSink)); + } + return null; + } + + private static class ForEachTask<T> extends RecursiveAction { + private final int depth; + private final Spliterator<T> spliterator; + private final Sink<T> sink; + + private ForEachTask(int depth, Spliterator<T> spliterator, Sink<T> sink) { + this.depth = depth; + this.spliterator = spliterator; + this.sink = sink; + } + + @Override + protected void compute() { + if (depth == 0) { + spliterator.forEach(sink); + } else { + ForEachTask<T> left = new ForEachTask<>(depth - 1, spliterator.split(), sink); + ForEachTask<T> right = new ForEachTask<>(depth - 1, spliterator, sink); + right.fork(); + left.compute(); + right.join(); + } + } + } +}
--- a/src/share/classes/java/util/streams/ops/GroupByOp.java Mon Aug 20 16:10:37 2012 -0400 +++ b/src/share/classes/java/util/streams/ops/GroupByOp.java Sat Aug 25 19:03:55 2012 -0400 @@ -37,7 +37,7 @@ * * @author Brian Goetz */ -public class GroupByOp<T, K> implements EagerOp<T, Map<K, SizedStreamable<T>>> { +public class GroupByOp<T, K> implements TerminalOp<T, Map<K, Streamable<T>>> { private final Mapper<? super T, ? extends K> mapper; public GroupByOp(Mapper<? super T, ? extends K> mapper) { @@ -45,8 +45,8 @@ } @Override - public StatefulSink<T, Map<K, SizedStreamable<T>>> sink() { - return new StatefulSink<T, Map<K, SizedStreamable<T>>>() { + public StatefulSink<T, Map<K, Streamable<T>>> sink() { + return new StatefulSink<T, Map<K, Streamable<T>>>() { private final Map<K, StreamBuilder<T>> map = new HashMap<>(); @Override @@ -55,7 +55,7 @@ } @Override - public Map<K, SizedStreamable<T>> end() { + public Map<K, Streamable<T>> end() { return (Map) map; }
--- a/src/share/classes/java/util/streams/ops/IdOp.java Mon Aug 20 16:10:37 2012 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,53 +0,0 @@ -/* - * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. Oracle designates this - * particular file as subject to the "Classpath" exception as provided - * by Oracle in the LICENSE file that accompanied this code. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA - * or visit www.oracle.com if you need additional information or have any - * questions. - */ -package java.util.streams.ops; - -import java.util.Iterator; -import java.util.functions.Sink; -import java.util.streams.Stream; - -/** - * IdOp - * - * @author Brian Goetz - */ -public class IdOp<T> implements StatelessOp<T,T> { - @Override - public int getStreamState(int upstreamState) { - return upstreamState & ~Stream.STATE_UNKNOWN_MASK_V1; - } - - @Override - public Iterator<T> iterator(Iterator<T> in) { - return in; - } - - @Override - public Sink<T> sink(final Sink<? super T> sink) { - return (Sink<T>) sink; - } - - // No need to support fused operations; cheaper to just call iterator/sink -}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/share/classes/java/util/streams/ops/IntermediateOp.java Sat Aug 25 19:03:55 2012 -0400 @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ +package java.util.streams.ops; + +import java.util.Iterator; +import java.util.functions.Sink; +import java.util.streams.Stream; + +/** + * An operation performed upon elements. + * + * @param <T> Type of input elements to the operation. + * @param <U> Type of output elements to the operation. + * + * @author Brian Goetz + */ +public interface IntermediateOp<T,U> { + + /** + * Return the resultant state of a stream utilizing this operation + * considering the state of the provided stream. + * + * @param upstreamState stream state of the upstream stream. + * @return the resultant state of a stream utilizing this operation + * considering the state of the provided stream. + */ + int getStreamFlags(int upstreamState); + + /** + * If {@code true} then operation is stateful and accumulates state. + * + * @return {@code true} then operation is stateful and accumulates state. + */ + boolean isStateful(); + + /** + * If {@code true}, then evaluation of the pipeline is constrained to be + * pull-oriented. This is true for operations that may truncate or otherwise + * manipulate the stream contents. + */ + boolean isShortCircuit() default { return false; } + + /** + * Return an iterator of the elements of the stream. The operation will be + * performed upon each element as it is returned by {@code Iterator.next()}. + * + * @param in the source stream. + * @return an iterator of the elements of the stream. + */ + Iterator<U> iterator(Iterator<T> in); + + /** + * Return a sink which will accept elements, perform the operation upon + * each element and send it to the provided sink. + * + * @param sink elements will be sent to this sink after the processing. + * @return a sink which will accept elements and perform the operation upon + * each element. + */ + Sink<T> sink(Sink<? super U> sink); + + Stream.Shape inputShape() default { return Stream.Shape.LINEAR; } + Stream.Shape outputShape() default { return Stream.Shape.LINEAR; } +}
--- a/src/share/classes/java/util/streams/ops/MapExtractKeysOp.java Mon Aug 20 16:10:37 2012 -0400 +++ b/src/share/classes/java/util/streams/ops/MapExtractKeysOp.java Sat Aug 25 19:03:55 2012 -0400 @@ -37,7 +37,7 @@ */ public class MapExtractKeysOp<K,V> implements StatelessOp<Mapping<K,V>, K> { @Override - public int getStreamState(int upstreamState) { + public int getStreamFlags(int upstreamState) { return upstreamState & ~Stream.STATE_UNKNOWN_MASK_V1; }
--- a/src/share/classes/java/util/streams/ops/MapExtractValuesOp.java Mon Aug 20 16:10:37 2012 -0400 +++ b/src/share/classes/java/util/streams/ops/MapExtractValuesOp.java Sat Aug 25 19:03:55 2012 -0400 @@ -37,7 +37,7 @@ */ public class MapExtractValuesOp<K,V> implements StatelessOp<Mapping<K,V>, V> { @Override - public int getStreamState(int upstreamState) { + public int getStreamFlags(int upstreamState) { return upstreamState & ~Stream.STATE_UNKNOWN_MASK_V1; }
--- a/src/share/classes/java/util/streams/ops/MapFilterKeysOp.java Mon Aug 20 16:10:37 2012 -0400 +++ b/src/share/classes/java/util/streams/ops/MapFilterKeysOp.java Sat Aug 25 19:03:55 2012 -0400 @@ -45,7 +45,7 @@ } @Override - public int getStreamState(int upstreamState) { + public int getStreamFlags(int upstreamState) { return upstreamState & ~(Stream.STATE_SIZED | Stream.STATE_UNKNOWN_MASK_V1); }
--- a/src/share/classes/java/util/streams/ops/MapFilterValuesOp.java Mon Aug 20 16:10:37 2012 -0400 +++ b/src/share/classes/java/util/streams/ops/MapFilterValuesOp.java Sat Aug 25 19:03:55 2012 -0400 @@ -45,7 +45,7 @@ } @Override - public int getStreamState(int upstreamState) { + public int getStreamFlags(int upstreamState) { return upstreamState & ~(Stream.STATE_SIZED | Stream.STATE_UNKNOWN_MASK_V1); }
--- a/src/share/classes/java/util/streams/ops/MapMapValuesOp.java Mon Aug 20 16:10:37 2012 -0400 +++ b/src/share/classes/java/util/streams/ops/MapMapValuesOp.java Sat Aug 25 19:03:55 2012 -0400 @@ -49,7 +49,7 @@ } @Override - public int getStreamState(int upstreamState) { + public int getStreamFlags(int upstreamState) { return upstreamState & ~(Stream.STATE_UNKNOWN_MASK_V1); }
--- a/src/share/classes/java/util/streams/ops/MapOp.java Mon Aug 20 16:10:37 2012 -0400 +++ b/src/share/classes/java/util/streams/ops/MapOp.java Sat Aug 25 19:03:55 2012 -0400 @@ -43,7 +43,7 @@ } @Override - public int getStreamState(int upstreamState) { + public int getStreamFlags(int upstreamState) { return upstreamState & ~(Stream.STATE_SORTED | Stream.STATE_UNIQUE | Stream.STATE_UNKNOWN_MASK_V1); }
--- a/src/share/classes/java/util/streams/ops/MapSortedOp.java Mon Aug 20 16:10:37 2012 -0400 +++ b/src/share/classes/java/util/streams/ops/MapSortedOp.java Sat Aug 25 19:03:55 2012 -0400 @@ -26,7 +26,6 @@ import java.util.*; import java.util.functions.Sink; -import java.util.streams.ParallelStreamable; import java.util.streams.StatefulSink; import java.util.streams.Stream; @@ -64,7 +63,7 @@ } @Override - public int getStreamState(int upstreamState) { + public int getStreamFlags(int upstreamState) { return upstreamState | Stream.STATE_SORTED; } @@ -101,7 +100,7 @@ private static <K,V> MapIterator<K,V> iterator(Iterator<Mapping<K, V>> iterator, Comparator<? super Mapping<K,V>> comparator) { Objects.requireNonNull(iterator); Objects.requireNonNull(comparator); - final PriorityQueue<? super Mapping<K,V>> pq = new PriorityQueue<>(DEFAULT_PRIORITY_QUEUE_SIZE, comparator); + final PriorityQueue<Mapping<K,V>> pq = new PriorityQueue<>(DEFAULT_PRIORITY_QUEUE_SIZE, comparator); while (iterator.hasNext()) { pq.add(iterator.next()); }
--- a/src/share/classes/java/util/streams/ops/MapSwapOp.java Mon Aug 20 16:10:37 2012 -0400 +++ b/src/share/classes/java/util/streams/ops/MapSwapOp.java Sat Aug 25 19:03:55 2012 -0400 @@ -47,7 +47,7 @@ } @Override - public int getStreamState(int upstreamState) { + public int getStreamFlags(int upstreamState) { return upstreamState & ~(Stream.STATE_UNIQUE | Stream.STATE_SORTED | Stream.STATE_UNKNOWN_MASK_V1); }
--- a/src/share/classes/java/util/streams/ops/MappedOp.java Mon Aug 20 16:10:37 2012 -0400 +++ b/src/share/classes/java/util/streams/ops/MappedOp.java Sat Aug 25 19:03:55 2012 -0400 @@ -46,7 +46,7 @@ } @Override - public int getStreamState(int upstreamState) { + public int getStreamFlags(int upstreamState) { return upstreamState & ~Stream.STATE_UNKNOWN_MASK_V1; }
--- a/src/share/classes/java/util/streams/ops/NoneMatchOp.java Mon Aug 20 16:10:37 2012 -0400 +++ b/src/share/classes/java/util/streams/ops/NoneMatchOp.java Sat Aug 25 19:03:55 2012 -0400 @@ -27,14 +27,13 @@ import java.util.Iterator; import java.util.Objects; import java.util.functions.Predicate; -import java.util.streams.ParallelStreamable; /** * NoneMatchOp * * @author Brian Goetz */ -public class NoneMatchOp<T> implements ShortCircuitEagerOp<T, Boolean> { +public class NoneMatchOp<T> implements ShortCircuitTerminalOp<T, Boolean> { private final Predicate<? super T> predicate; public NoneMatchOp(Predicate<? super T> predicate) { @@ -58,7 +57,7 @@ } @Override - public <V> Boolean computeParallel(ParallelStreamable<V> source, ParallelOpHelper<T, V> helper) { + public <V> Boolean computeParallel(ParallelOpHelper<T, V> helper) { throw new UnsupportedOperationException("nyi"); } }
--- a/src/share/classes/java/util/streams/ops/ParallelOp.java Mon Aug 20 16:10:37 2012 -0400 +++ b/src/share/classes/java/util/streams/ops/ParallelOp.java Sat Aug 25 19:03:55 2012 -0400 @@ -25,9 +25,9 @@ package java.util.streams.ops; import java.util.Iterator; +import java.util.Spliterator; import java.util.concurrent.ForkJoinTask; import java.util.functions.Sink; -import java.util.streams.ParallelStreamable; /** * ParallelOp @@ -38,9 +38,11 @@ * @author Brian Goetz */ public interface ParallelOp<T, U> { + public interface ParallelOpHelper<T, V> { - public int suggestDepth(ParallelStreamable<V> source); - public Sink<V> sink(Sink<T> sink); + public int suggestDepth(); + public Spliterator<V> spliterator(); + public Sink<V> sink(Sink<? super T> sink); public Iterator<T> iterator(); public<Z> Z invoke(ForkJoinTask<Z> task); } @@ -49,9 +51,8 @@ * Compute the result of the operation in parallel and return the result. * * @param <V> - * @param source element source * @param helper * @return result of the operation. */ - <V> U computeParallel(ParallelStreamable<V> source, ParallelOpHelper<T, V> helper); + <V> U computeParallel(ParallelOpHelper<T, V> helper); }
--- a/src/share/classes/java/util/streams/ops/ShortCircuitEagerOp.java Mon Aug 20 16:10:37 2012 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,54 +0,0 @@ -/* - * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. Oracle designates this - * particular file as subject to the "Classpath" exception as provided - * by Oracle in the LICENSE file that accompanied this code. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA - * or visit www.oracle.com if you need additional information or have any - * questions. - */ -package java.util.streams.ops; - -import java.util.Iterator; -import java.util.streams.*; - -/** - * ShortCircuitEagerOp - * - * @param <T> Type of elements. - * @param <U> Type of result. - * - * @author Brian Goetz - */ -public interface ShortCircuitEagerOp<T, U> extends ParallelOp<T, U> { - /** - * Evaluate some portion of the elements and return result. The - * implementation need not traverse all elements of the iterator. - * - * @param iterator sequence of elements to be evaluated. - * @return - */ - public U evaluate(Iterator<T> iterator); - - @Override - <V> U computeParallel(ParallelStreamable<V> source, ParallelOpHelper<T, V> helper) default { - // dumb default serial version - System.out.println("Using ShortCircuitEagerOp.computeParallel serial default"); - return evaluate(helper.iterator()); - } -}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/share/classes/java/util/streams/ops/ShortCircuitTerminalOp.java Sat Aug 25 19:03:55 2012 -0400 @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ +package java.util.streams.ops; + +import java.util.Iterator; +import java.util.streams.*; + +/** + * ShortCircuitEagerOp + * + * @param <T> Type of elements. + * @param <U> Type of result. + * + * @author Brian Goetz + */ +public interface ShortCircuitTerminalOp<T, U> extends TerminalOp<T, U> { + /** + * Evaluate some portion of the elements and return result. The + * implementation need not traverse all elements of the iterator. + * + * @param iterator sequence of elements to be evaluated. + * @return + */ + public U evaluate(Iterator<T> iterator); + + @Override + StatefulSink<T, U> sink() default { throw new UnsupportedOperationException(); } + + @Override + boolean isShortCircuit() default { return true; } + + @Override + <V> U computeParallel(ParallelOpHelper<T, V> helper) default { + // dumb default serial version + System.out.println("Using ShortCircuitEagerOp.computeParallel serial default"); + return evaluate(helper.iterator()); + } +}
--- a/src/share/classes/java/util/streams/ops/SortedOp.java Mon Aug 20 16:10:37 2012 -0400 +++ b/src/share/classes/java/util/streams/ops/SortedOp.java Sat Aug 25 19:03:55 2012 -0400 @@ -114,7 +114,7 @@ } @Override - public int getStreamState(int upstreamState) { + public int getStreamFlags(int upstreamState) { return upstreamState | Stream.STATE_SORTED; } }
--- a/src/share/classes/java/util/streams/ops/StatefulOp.java Mon Aug 20 16:10:37 2012 -0400 +++ b/src/share/classes/java/util/streams/ops/StatefulOp.java Sat Aug 25 19:03:55 2012 -0400 @@ -12,7 +12,7 @@ * @param <V> Type of terminal state. * */ -public interface StatefulOp<T, U, V> extends ElementwiseOp<T,U>, ParallelOp<T, ParallelStreamable<U>> { +public interface StatefulOp<T, U, V> extends IntermediateOp<T,U>, ParallelOp<T, TreeUtils.Node<U>> { @Override public abstract StatefulSink<T, V> sink(Sink<? super U> sink); @@ -23,31 +23,15 @@ } @Override - <Z> ParallelStreamable<U> computeParallel(ParallelStreamable<Z> source, - ParallelOpHelper<T, Z> helper) default { + <Z> TreeUtils.Node<U> computeParallel(ParallelOpHelper<T, Z> helper) default { // dumb default serial implementation final StreamBuilder<U> sb = StreamBuilders.make(); StatefulSink<T, V> sSink = sink(sb); Sink<Z> sink = helper.sink(sSink); - Spliterator<Z> spliterator = source.spliterator(); + Spliterator<Z> spliterator = helper.spliterator(); sSink.begin(spliterator.getRemainingSizeIfKnown()); spliterator.forEach(sink); sSink.end(); - return new ParallelStreamable<U>() { - @Override - public int size() { - return sb.size(); - } - - @Override - public Iterator<U> iterator() { - return sb.iterator(); - } - - @Override - public Spliterator<U> spliterator() { - return Streams.spliterator(sb); - } - }; + return TreeUtils.node(sb); } }
--- a/src/share/classes/java/util/streams/ops/StatelessOp.java Mon Aug 20 16:10:37 2012 -0400 +++ b/src/share/classes/java/util/streams/ops/StatelessOp.java Sat Aug 25 19:03:55 2012 -0400 @@ -3,7 +3,7 @@ /** * StreamOp */ -public interface StatelessOp<T, U> extends ElementwiseOp<T, U> { +public interface StatelessOp<T, U> extends IntermediateOp<T, U> { @Override public boolean isStateful() default { return false;
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/share/classes/java/util/streams/ops/TerminalOp.java Sat Aug 25 19:03:55 2012 -0400 @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ +package java.util.streams.ops; + +import java.util.Iterator; +import java.util.Spliterator; +import java.util.functions.Sink; +import java.util.streams.StatefulSink; + +/** + * EagerOp + * + * @author Brian Goetz + */ +public interface TerminalOp<T, U> extends ParallelOp<T, U> { + /** + * Evaluate some portion of the elements and return result. The + * implementation need not traverse all elements of the iterator. + * + * @param iterator sequence of elements to be evaluated. + * @return + */ + public U evaluate(Iterator<T> iterator) default { + StatefulSink<T,U> sink = sink(); + sink.begin(-1); + while (iterator.hasNext()) + sink.accept(iterator.next()); + return sink.end(); + } + + public StatefulSink<T, U> sink(); + + public boolean isShortCircuit() default { return false; } + + @Override + <V> U computeParallel(ParallelOpHelper<T, V> helper) default { + // dumb default serial version + StatefulSink<T, U> sSink = sink(); + Sink<V> vSink = helper.sink(sSink); + Spliterator<V> spliterator = helper.spliterator(); + sSink.begin(spliterator.getRemainingSizeIfKnown()); + // @@@ This is ridiculous! Integrate Sink/StatefulSink functionality in a less hacky way + if (sSink == vSink) + spliterator.forEach(vSink); + else + TreeUtils.forEach(spliterator, vSink); + return sSink.end(); + } +}
--- a/src/share/classes/java/util/streams/ops/ToArrayOp.java Mon Aug 20 16:10:37 2012 -0400 +++ b/src/share/classes/java/util/streams/ops/ToArrayOp.java Sat Aug 25 19:03:55 2012 -0400 @@ -25,7 +25,6 @@ package java.util.streams.ops; import java.util.Arrays; -import java.util.streams.ParallelStreamable; import java.util.streams.StatefulSink; /** @@ -33,14 +32,16 @@ * <p/> * @author Brian Goetz */ -public class ToArrayOp<T> implements EagerOp<T, Object[]> { +public class ToArrayOp<T> implements TerminalOp<T, Object[]> { private final static ToArrayOp<?> INSTANCE = new ToArrayOp<>(); + @SuppressWarnings("unchecked") public static <T> ToArrayOp<T> singleton() { return (ToArrayOp<T>) INSTANCE; } + @Override public StatefulSink<T, Object[]> sink() { return new StatefulSink<T, Object[]>() {
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/share/classes/java/util/streams/ops/TreeUtils.java Sat Aug 25 19:03:55 2012 -0400 @@ -0,0 +1,438 @@ +/* + * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ +package java.util.streams.ops; + +import java.util.*; +import java.util.concurrent.RecursiveAction; +import java.util.concurrent.RecursiveTask; +import java.util.functions.Sink; +import java.util.streams.*; + +/** + * Collector + * + * @author Brian Goetz + */ +public class TreeUtils { + // @@@ This method is a hack, in that helper.sink() might serve up a stateful sink. + // This logic should be more tightly reflected in the helper rather than expecting every client to + // do instanceof and casts. + static<T> void forEach(Spliterator<T> spliterator, Sink<? super T> sink) { + if (sink instanceof StatefulSink) { + StatefulSink<T, ?> sSink = (StatefulSink<T, ?>) sink; + sSink.begin(spliterator.getRemainingSizeIfKnown()); + spliterator.forEach(sSink); + sSink.end(); + } + else { + spliterator.forEach(sink); + } + } + + public static<T, U> Node<T> collect(ParallelOp.ParallelOpHelper<T, U> helper, + boolean flattenLeaves, + boolean flattenTree) { + int depth = helper.suggestDepth(); + Spliterator<U> spliterator = helper.spliterator(); + int size = spliterator.getRemainingSizeIfKnown(); + boolean splitSizesKnown = false; + if (depth == 0) { + StreamBuilder<T> builder; + // @@@ splitSizesKnown should come from helper + // Need to account for SIZED flag from pipeline + if (size != -1 && splitSizesKnown) { + builder = StreamBuilders.makeFixed(size); + forEach(spliterator, helper.sink(builder)); + return node((T[]) builder.toArray()); + } + else { + builder = StreamBuilders.make(); + forEach(spliterator, helper.sink(builder)); + return node(builder); + } + } + else { + // @@@ splitSizesKnown should come from helper + // Need to account for SIZED flag from pipeline + if (size != -1 && spliterator.isPredictableSplits() && splitSizesKnown) { + T[] array = (T[]) new Object[size]; + helper.invoke(new SizedCollectorTask<>(depth, spliterator, helper, array, 0, size)); + return node(array); + } + else { + Node<T> node = helper.invoke(new CollectorTask<>(depth, spliterator, helper)); + if (flattenTree) { + T[] array = (T[]) new Object[node.size()]; + helper.invoke(new ToArrayTask<>(node, array, 0)); + return node(array); + } + return node; + } + } + } + + public static class CollectorOp<T> implements TerminalOp<T, TreeUtils.Node<T>> { + @Override + public StatefulSink<T, TreeUtils.Node<T>> sink() { + throw new UnsupportedOperationException(); + } + + private static CollectorOp<?> INSTANCE = new CollectorOp<>(); + + public static <T> CollectorOp<T> singleton() { + return (CollectorOp<T>) INSTANCE; + } + + @Override + public <V> TreeUtils.Node<T> computeParallel(ParallelOpHelper<T, V> helper) { + return TreeUtils.collect(helper, false, false); + } + } + + private static class CollectorTask<T, U> extends RecursiveTask<Node<U>> { + private final int depth; + private final Spliterator<T> spliterator; + private ParallelOp.ParallelOpHelper<U, T> helper; + + public CollectorTask(int depth, Spliterator<T> spliterator, ParallelOp.ParallelOpHelper<U, T> helper) { + this.depth = depth; + this.spliterator = spliterator; + this.helper = helper; + } + + @Override + protected Node<U> compute() { + if (depth == 0) { + // @@@ Usual comment about using fixed stream builders if we know enough to do so + StreamBuilder<U> builder = StreamBuilders.make(); + spliterator.forEach(helper.sink(builder)); + return node(builder); + } + else { + CollectorTask<T, U> left = new CollectorTask<>(depth-1, spliterator.split(), helper); + CollectorTask<T, U> right = new CollectorTask<>(depth-1, spliterator, helper); + right.fork(); + Node<U> leftResult = left.compute(); + Node<U> rightResult = right.join(); + return node(leftResult, rightResult); + } + } + } + + private static class SizedCollectorTask<T, U> extends RecursiveAction { + private final int depth; + private final Spliterator<T> spliterator; + private ParallelOp.ParallelOpHelper<U, T> helper; + private final U[] array; + private int offset; + private int length; + + private SizedCollectorTask(int depth, Spliterator<T> spliterator, ParallelOp.ParallelOpHelper<U, T> helper, U[] array, int offset, int length) { + this.depth = depth; + this.spliterator = spliterator; + this.helper = helper; + this.array = array; + this.offset = offset; + this.length = length; + } + + @Override + protected void compute() { + if (depth == 0) { + spliterator.forEach(helper.sink(Arrays.sink(array, offset, length))); + } + else { + Spliterator<T> split = spliterator.split(); + int leftSize = split.getRemainingSizeIfKnown(); + assert(leftSize != -1); + SizedCollectorTask<T, U> left = new SizedCollectorTask<>(depth-1, split, helper, array, offset, leftSize); + SizedCollectorTask<T, U> right = new SizedCollectorTask<>(depth-1, spliterator, helper, array, offset+leftSize, length-leftSize); + right.fork(); + left.compute(); + right.join(); + } + } + } + + private static class ToArrayTask<T> extends RecursiveAction { + private final T[] array; + private final Node<T> node; + private final int offset; + + private ToArrayTask(Node<T> node, T[] array, int offset) { + this.array = array; + this.node = node; + this.offset = offset; + } + + @Override + protected void compute() { + if (node instanceof InternalNode) { + InternalNode<T> n = (InternalNode<T>) node; + Node<T> left = n.left(); + ToArrayTask<T> leftTask = new ToArrayTask<>(left, array, offset); + ToArrayTask<T> rightTask = new ToArrayTask<>(n.right(), array, offset + left.size()); + rightTask.fork(); + leftTask.compute(); + rightTask.join(); + } + else { + node.copyTo(array, offset); + } + } + } + + public static interface Node<T> extends Traversable<T>, Splittable<T>, Sized { + void copyTo(T[] array, int offset); + } + + public static interface InternalNode<T> extends Node<T> { + Node<T> left(); + Node<T> right(); + } + + public static<T> Node<T> node(final T[] array) { + return new Node<T>() { + private final T[] data = array; + + @Override + public int size() { + return data.length; + } + + @Override + public void forEach(Sink<? super T> sink) { + for (T t : data) + sink.accept(t); + } + + @Override + public Iterator<T> iterator() { + return Arrays.iterator(data, 0, data.length); + } + + @Override + public Spliterator<T> spliterator() { + return Arrays.spliterator(data, 0, data.length); + } + + @Override + public void copyTo(T[] dest, int destOffset) { + System.arraycopy(data, 0, dest, destOffset, data.length); + } + + @Override + public String toString() { + return "ArrayNode" + Arrays.toString(data); + } + }; + } + + public static<T> Node<T> node(final StreamBuilder<T> stream) { + return new Node<T>() { + private final StreamBuilder<T> data = stream; + + @Override + public int size() { + return data.size(); + } + + @Override + public void forEach(Sink<? super T> sink) { + data.forEach(sink); + } + + @Override + public Iterator<T> iterator() { + return data.iterator(); + } + + @Override + public Spliterator<T> spliterator() { + return Arrays.spliterator((T[]) data.toArray()); + } + + @Override + public void copyTo(T[] array, int offset) { + int i = 0; + for (T t : this) + array[offset+(i++)] = t; + } + + @Override + public String toString() { + return "SBNode" + data.toString(); + } + }; + } + + public static<T> Node<T> node(Node<T> left, Node<T> right) { + return new InternalNodeImpl<>(left, right); + } + + private static class InternalNodeImpl<T> implements InternalNode<T> { + private final Node<T> left, right; + int size = 0; + + private InternalNodeImpl(Node<T> left, Node<T> right) { + this.left = left; + this.right = right; + } + + @Override + public Node<T> left() { + return left; + } + + @Override + public Node<T> right() { + return right; + } + + @Override + public int size() { + if (size == 0) + size = left.size() + right.size(); + return size; + } + + @Override + public void forEach(Sink<? super T> sink) { + left.forEach(sink); + right.forEach(sink); + } + + @Override + public Iterator<T> iterator() { + return concatenate(left.iterator(), right.iterator()); + } + + @Override + public Spliterator<T> spliterator() { + return new InternalNodeSpliterator<>(this); + } + + @Override + public void copyTo(T[] array, int offset) { + left.copyTo(array, offset); + right.copyTo(array, offset+left.size()); + } + + @Override + public String toString() { + return String.format("IntNode[%s,%s]", left.toString(), right.toString()); + } + + private static class InternalNodeSpliterator<T> implements Spliterator<T> { + private Node<T> cur; + private Iterator<T> iterator; + + private InternalNodeSpliterator(InternalNodeImpl<T> cur) { + this.cur = cur; + } + + private Iterator<T> iterator() { + if (iterator == null) + iterator = cur.iterator(); + return iterator; + } + + @Override + public Spliterator<T> split() { + if (iterator != null) + throw new IllegalStateException(); + if (cur instanceof InternalNodeImpl) { + InternalNodeImpl<T> internalNode = (InternalNodeImpl<T>) cur; + Spliterator<T> ret = internalNode.left.spliterator(); + cur = internalNode.right; + return ret; + } + else + return Streams.emptySpliterator(); + } + + @Override + public void forEach(Sink<? super T> sink) { + if (iterator == null) { + cur.forEach(sink); + iterator = Collections.emptyIterator(); + } + else { + while (iterator.hasNext()) + sink.accept(iterator.next()); + } + } + + @Override + public int getRemainingSizeIfKnown() { + return (iterator == null) ? cur.size() : -1; + } + + @Override + public boolean isPredictableSplits() { + return true; + } + + @Override + public boolean hasNext() { + return iterator().hasNext(); + } + + @Override + public T next() { + return iterator().next(); + } + } + } + + public static<T> Iterator<T> concatenate(final Iterator<? extends T> left, final Iterator<? extends T> right) { + return new Iterator<T>() { + private Iterator<? extends T> it = left; + private boolean switched = false; + + @Override + public boolean hasNext() { + if (it.hasNext()) { + return true; + } else if (switched) { + return false; + } else { + switched = true; + it = right; + return it.hasNext(); + } + } + + @Override + public T next() { + if (hasNext()) { + return it.next(); + } else { + throw new NoSuchElementException(); + } + } + }; + } +}
--- a/src/share/classes/java/util/streams/ops/UniqOp.java Mon Aug 20 16:10:37 2012 -0400 +++ b/src/share/classes/java/util/streams/ops/UniqOp.java Sat Aug 25 19:03:55 2012 -0400 @@ -26,7 +26,6 @@ import java.util.*; import java.util.functions.Sink; -import java.util.streams.ParallelStreamable; import java.util.streams.StatefulSink; import java.util.streams.Stream; @@ -48,7 +47,7 @@ } @Override - public int getStreamState(int upstreamState) { + public int getStreamFlags(int upstreamState) { // If the upstream is sorted, we need only cache last element // If the upstream is unique, this is a no-op return upstreamState & ~(Stream.STATE_SIZED | Stream.STATE_UNKNOWN_MASK_V1) | Stream.STATE_UNIQUE;
--- a/test-ng/build.xml Mon Aug 20 16:10:37 2012 -0400 +++ b/test-ng/build.xml Sat Aug 25 19:03:55 2012 -0400 @@ -37,7 +37,7 @@ <target name="test" depends="test-compile" > <echo>Results at: file:${test.reports.dir}/index.html</echo> - <testng classpathref="test.class.path" outputdir="${test.reports.dir}"> + <testng classpathref="test.class.path" outputdir="${test.reports.dir}" > <classfileset dir="${test.classes.dir}" includes="**/${test.pattern}.class"/> <jvmarg value="-ea" /> <jvmarg value="-esa" />
--- a/test-ng/tests/org/openjdk/tests/java/lang/ThreadLocalFactoryTest.java Mon Aug 20 16:10:37 2012 -0400 +++ b/test-ng/tests/org/openjdk/tests/java/lang/ThreadLocalFactoryTest.java Sat Aug 25 19:03:55 2012 -0400 @@ -77,7 +77,7 @@ public ThreadLocalFactoryTest() { } - public void test() throws InterruptedException, Exception { + public void test() throws Exception { int threadCount = 500; Thread th[] = new Thread[threadCount]; final boolean visited[] = new boolean[threadCount]; @@ -87,11 +87,11 @@ th[i] = new Thread() { @Override public void run() { - int threadId = ((Integer) (threadLocal.get())).intValue(); + int threadId = threadLocal.get(); assertFalse(visited[threadId], "visited[" + threadId + "]=" + visited[threadId]); visited[threadId] = true; // check the get() again - int secondCheckThreadId = ((Integer) (threadLocal.get())).intValue(); + int secondCheckThreadId = threadLocal.get(); assertEquals( secondCheckThreadId, threadId ); Thread.yield(); }
--- a/test-ng/tests/org/openjdk/tests/java/lang/ThreadLocalTest.java Mon Aug 20 16:10:37 2012 -0400 +++ b/test-ng/tests/org/openjdk/tests/java/lang/ThreadLocalTest.java Sat Aug 25 19:03:55 2012 -0400 @@ -43,7 +43,7 @@ * Test of get method, of class ThreadLocal. */ public void testGet() { - ThreadLocal instance = new ThreadLocal(); + ThreadLocal<?> instance = new ThreadLocal<>(); Object expResult = null; Object result = instance.get(); assertEquals(result, expResult); @@ -54,7 +54,7 @@ */ public void testSet() { String initialValue = "initial value"; - ThreadLocal instance = new ThreadLocal(); + ThreadLocal<String> instance = new ThreadLocal<>(); instance.set(initialValue); assertEquals( instance.get(), initialValue ); } @@ -64,7 +64,7 @@ */ public void testRemove() { String putThisIn = "value was set"; - ThreadLocal instance = new ThreadLocal(); + ThreadLocal<String> instance = new ThreadLocal<>(); instance.set( putThisIn ); instance.remove(); assertNull( instance.get() ); @@ -75,7 +75,7 @@ */ public void testInitWithFactory() { String whatDoYouExpect = "OneWithEverything"; - ThreadLocal<String> hotdogForTheMonk = new ThreadLocal( new StringFactory( whatDoYouExpect )); + ThreadLocal<String> hotdogForTheMonk = new ThreadLocal<>(new StringFactory( whatDoYouExpect )); assertEquals( hotdogForTheMonk.get(), hotdogForTheMonk.get() ); }
--- a/test-ng/tests/org/openjdk/tests/java/util/FillableStringTest.java Mon Aug 20 16:10:37 2012 -0400 +++ b/test-ng/tests/org/openjdk/tests/java/util/FillableStringTest.java Sat Aug 25 19:03:55 2012 -0400 @@ -37,7 +37,8 @@ public class FillableStringTest { public Stream<String> generate() { return Arrays.asList("one", "two", "three", "four", "five", "six") - .filter(s -> s.length() > 3) + .stream() + .filter(s->s.length() > 3) .map(s -> s.toUpperCase()); }
--- a/test-ng/tests/org/openjdk/tests/java/util/LambdaTestHelpers.java Mon Aug 20 16:10:37 2012 -0400 +++ b/test-ng/tests/org/openjdk/tests/java/util/LambdaTestHelpers.java Sat Aug 25 19:03:55 2012 -0400 @@ -26,7 +26,6 @@ import java.util.*; import java.util.functions.*; -import java.util.streams.ParallelStreamable; import java.util.streams.StatefulSink; import static org.testng.Assert.assertEquals; @@ -106,15 +105,15 @@ return list; } - public static void assertCountSum(Iterable<Integer> it, int count, int sum) { + public static void assertCountSum(Iterable<? super Integer> it, int count, int sum) { assertCountSum(it.iterator(), count, sum); } - public static void assertCountSum(Iterator<Integer> it, int count, int sum) { + public static void assertCountSum(Iterator<? super Integer> it, int count, int sum) { int c = 0; int s = 0; while (it.hasNext()) { - int i = it.next(); + int i = (Integer) it.next(); c++; s += i; } @@ -211,24 +210,6 @@ assertContents(one, two); } - static<T> void assertContents(ParallelStreamable<T> pi, Iterable<T> list) { - Iterator<T> pI = pi.sequential().iterator(); - Iterator<T> lI = list.iterator(); - - while (lI.hasNext()) { - assertTrue(pI.hasNext()); - T pT = pI.next(); - T lT = lI.next(); - assertEquals(pT, lT); - } - assertTrue(!pI.hasNext()); - - for (int depth=0; depth<6; depth++) { - Iterable<Iterable<T>> splits = split(pi, depth); - assertSplitContents(splits, list); - } - } - static <T> void assertSplitContents(Iterable<Iterable<T>> splits, Iterable<T> list) { Iterator<Iterable<T>> mI = splits.iterator(); Iterator<T> pI = null; @@ -261,10 +242,6 @@ } } - static <T> Iterable<Iterable<T>> split(ParallelStreamable<T> pi, int depth) { - return splitHelper(pi.spliterator(), depth, new ArrayList<Iterable<T>>()); - } - private static <T> Iterable<Iterable<T>> splitHelper(Spliterator<T> s, int depth, List<Iterable<T>> iterables) { if (depth == 0) { List<T> list = new ArrayList<>();
--- a/test-ng/tests/org/openjdk/tests/java/util/StringJoinerTest.java Mon Aug 20 16:10:37 2012 -0400 +++ b/test-ng/tests/org/openjdk/tests/java/util/StringJoinerTest.java Sat Aug 25 19:03:55 2012 -0400 @@ -334,10 +334,9 @@ list.add(TWO); list.add(THREE); - StringJoiner sj = list.into(new StringJoiner(",", "{", "}")); - Iterable<Character> ic = sj.asChars(); + StringJoiner sj = list.stream().into(new StringJoiner(",", "{", "}")); + Iterator<Character> it = sj.asChars().iterator(); String result = new String(); - Iterator<Character> it = ic.iterator(); while (it.hasNext()) { result += it.next(); } @@ -350,13 +349,19 @@ sj.add(ONE); sj.add(TWO); - Iterable<Integer> codePoints = sj.asCodePoints(); - long count = Iterables.count(codePoints); + Iterator<Integer> codePoints = sj.asCodePoints().iterator(); + long count=0; + while (codePoints.hasNext()) { + codePoints.next(); + ++count; + } String expected = "{"+ONE+"-"+TWO+"}"; assertEquals(count, expected.length(), "Number of codePoints in result is not correct"); int i = 0; - for (Integer cp : codePoints) { + codePoints = sj.asCodePoints().iterator(); + while (codePoints.hasNext()) { + Integer cp = codePoints.next(); assertEquals(cp.intValue(), expected.codePointAt(i), "Code point at i=" + i); i++; }
--- a/test-ng/tests/org/openjdk/tests/java/util/concurrent/AtomicReferenceTest.java Mon Aug 20 16:10:37 2012 -0400 +++ b/test-ng/tests/org/openjdk/tests/java/util/concurrent/AtomicReferenceTest.java Sat Aug 25 19:03:55 2012 -0400 @@ -45,24 +45,17 @@ * Test of updateAndGet method, of class AtomicReference. */ public void testUpdateAndGet() { - System.out.println("getAndUpdate"); - UnaryOperator<Integer> op = (x -> x + 2); - AtomicReference instance = new AtomicReference(new Integer(3)); - Object expResult = 4; - Object result = instance.updateAndGet( op ); - assertEquals(result, expResult); + AtomicReference<Integer> instance = new AtomicReference<>(3); + int result = instance.updateAndGet(x -> x + 2); + assertEquals(result, 5); } /** * Test of getAndUpdate method, of class AtomicReference. */ public void testGetAndUpdate() { - System.out.println("getAndUpdate"); - UnaryOperator<Integer> op = (x -> x + 3); - AtomicReference instance = new AtomicReference(new Integer(3)); - Object expResult = 3; - Object result = instance.getAndUpdate( op ); - assertEquals(result, expResult); + AtomicReference<Integer> instance = new AtomicReference<>(3); + int result = instance.getAndUpdate(x -> x + 3); + assertEquals(result, 3); } - }
--- a/test-ng/tests/org/openjdk/tests/java/util/functions/UnaryOperatorTest.java Mon Aug 20 16:10:37 2012 -0400 +++ b/test-ng/tests/org/openjdk/tests/java/util/functions/UnaryOperatorTest.java Sat Aug 25 19:03:55 2012 -0400 @@ -48,13 +48,13 @@ */ public void testOperate() { Integer operand = 3; - UnaryOperator instance = new IncByTwo(); + UnaryOperator<Object> instance = new IncByTwo(); Integer expResult = 5; - Integer result = (Integer)instance.operate(operand); + Integer result = (Integer) instance.operate(operand); assertEquals(result, expResult); } - public class IncByTwo implements UnaryOperator { + public class IncByTwo implements UnaryOperator<Object> { @Override public Object operate(Object operand) {
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/test-ng/tests/org/openjdk/tests/java/util/streams/StreamTestDataProvider.java Sat Aug 25 19:03:55 2012 -0400 @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ +package org.openjdk.tests.java.util.streams; + +import org.openjdk.tests.java.util.LambdaTestHelpers; +import org.testng.annotations.DataProvider; + +/** + * StreamTestDataProvider + * + * @author Brian Goetz + */ +public class StreamTestDataProvider { + private static final Integer[] to0 = new Integer[0]; + private static final Integer[] to1 = new Integer[1]; + private static final Integer[] to10 = new Integer[10]; + private static final Integer[] to100 = new Integer[100]; + private static final Integer[] to1000 = new Integer[1000]; + private static final Integer[] reversed = new Integer[100]; + private static final Integer[] ones = new Integer[100]; + private static final Integer[] twice = new Integer[200]; + private static final Integer[] pseudoRandom; + + static { + Integer[][] arrays = {to0, to1, to10, to100, to1000}; + for (Integer[] arr : arrays) { + for (int i = 0; i < arr.length; i++) { + arr[i] = i; + } + } + for (int i = 0; i < reversed.length; i++) { + reversed[i] = reversed.length - i; + } + for (int i = 0; i < ones.length; i++) { + ones[i] = 1; + } + System.arraycopy(to100, 0, twice, 0, to100.length); + System.arraycopy(to100, 0, twice, to100.length, to100.length); + pseudoRandom = new Integer[LambdaTestHelpers.LONG_STRING.length()]; + for (int i = 0; i < LambdaTestHelpers.LONG_STRING.length(); i++) { + pseudoRandom[i] = (int) LambdaTestHelpers.LONG_STRING.charAt(i); + } + } + + // Return an array of Integer[] + @DataProvider(name = "opArrays") + public static Object[][] makeOpArrays() { + return new Object[][]{ + {"array to zero", to0}, + {"array to one", to1}, + {"array to ten", to10}, + {"array to 100", to100}, + {"array to 1000", to1000}, + {"array 100 ones", ones}, + {"array to 100 twice", twice}, + {"array reversed", reversed}, + {"array pseudorandom", pseudoRandom}}; + } +}
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/CumulateOpTest.java Mon Aug 20 16:10:37 2012 -0400 +++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/CumulateOpTest.java Sat Aug 25 19:03:55 2012 -0400 @@ -24,6 +24,7 @@ */ package org.openjdk.tests.java.util.streams.ops; +import org.openjdk.tests.java.util.streams.StreamTestDataProvider; import org.testng.annotations.Test; import java.util.streams.ops.CumulateOp; @@ -36,6 +37,7 @@ * * @author Brian Goetz */ +@Test public class CumulateOpTest extends StreamOpTestCase { public void testRawIterator() { assertCountSum(CumulateOp.iterator(countTo(0).iterator(), rPlus), 0, 0); @@ -44,7 +46,7 @@ assertContents(CumulateOp.iterator(countTo(5).iterator(), rMin), 1, 1, 1, 1, 1); } - @Test(dataProvider = "opArrays") + @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class) public void testOps(String name, Integer[] data) { assertConsistentOpBehavior(data, new CumulateOp<>(rPlus), l -> l); assertConsistentOpBehavior(data, new CumulateOp<>(rMin), l -> l);
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/FilterOpTest.java Mon Aug 20 16:10:37 2012 -0400 +++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/FilterOpTest.java Sat Aug 25 19:03:55 2012 -0400 @@ -24,6 +24,7 @@ */ package org.openjdk.tests.java.util.streams.ops; +import org.openjdk.tests.java.util.streams.StreamTestDataProvider; import org.testng.annotations.Test; import java.util.streams.ops.FilterOp; @@ -46,7 +47,7 @@ assertCountSum(FilterOp.iterator(FilterOp.iterator(countTo(10).iterator(), pEven), pOdd), 0, 0); } - @Test(dataProvider = "opArrays") + @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class) public void testOps(String name, Integer[] data) { assertConsistentOpBehavior(data, new FilterOp<>(pTrue), l -> l); assertConsistentOpBehavior(data, new FilterOp<>(pFalse), l -> 0);
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/FlatMapOpTest.java Mon Aug 20 16:10:37 2012 -0400 +++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/FlatMapOpTest.java Sat Aug 25 19:03:55 2012 -0400 @@ -24,12 +24,13 @@ */ package org.openjdk.tests.java.util.streams.ops; +import org.openjdk.tests.java.util.streams.StreamTestDataProvider; import org.testng.annotations.Test; import java.util.Arrays; import java.util.functions.FlatMapper; import java.util.functions.Sink; -import java.util.streams.Streamable; +import java.util.streams.Stream; import java.util.streams.ops.FlatMapOp; import static org.openjdk.tests.java.util.LambdaTestHelpers.*; @@ -39,6 +40,7 @@ * * @author Brian Goetz */ +@Test public class FlatMapOpTest extends StreamOpTestCase { private static final FlatMapper<Integer, Integer> mfIntToBits = new FlatMapper<Integer, Integer>() { @Override @@ -55,8 +57,8 @@ public void testFlatMap() { String[] stringsArray = {"hello", "there", "", "yada"}; - Streamable<String> strings = Arrays.asList(stringsArray); - assertConcat(strings.flatMap(flattenChars), "hellothereyada"); + Stream<String> strings = Arrays.asList(stringsArray).stream(); + assertConcat(strings.flatMap(flattenChars).iterator(), "hellothereyada"); assertCountSum(FlatMapOp.iterator(countTo(10).iterator(), mfId), 10, 55); assertCountSum(FlatMapOp.iterator(countTo(10).iterator(), mfNull), 0, 0); @@ -66,7 +68,7 @@ assertConsistentOpBehavior(new String[] { LONG_STRING }, new FlatMapOp<>(flattenChars), null); } - @Test(dataProvider = "opArrays") + @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class) public void testOps(String name, Integer[] data) { assertConsistentOpBehavior(data, new FlatMapOp<>(mfId), l -> l); assertConsistentOpBehavior(data, new FlatMapOp<>(mfNull), l -> 0);
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/GroupByOpTest.java Mon Aug 20 16:10:37 2012 -0400 +++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/GroupByOpTest.java Sat Aug 25 19:03:55 2012 -0400 @@ -24,24 +24,18 @@ */ package org.openjdk.tests.java.util.streams.ops; -import java.util.Arrays; -import java.util.List; +import org.openjdk.tests.java.util.streams.StreamTestDataProvider; +import org.testng.annotations.Test; + +import java.util.Collection; +import java.util.Iterator; import java.util.Map; import java.util.functions.Mappers; -import java.util.streams.SizedStreamable; +import java.util.streams.Stream; import java.util.streams.Streamable; -import org.testng.annotations.Test; - -import java.util.streams.ops.FilterOp; import java.util.streams.ops.GroupByOp; -import java.util.streams.ops.MapOp; import static org.openjdk.tests.java.util.LambdaTestHelpers.*; -import static org.openjdk.tests.java.util.LambdaTestHelpers.pEven; -import static org.openjdk.tests.java.util.LambdaTestHelpers.pOdd; - -import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertTrue; import static org.testng.Assert.assertEquals; /** @@ -54,15 +48,22 @@ public void testRawIterator() { GroupByOp<Integer, Boolean> grouping = new GroupByOp<>(Mappers.forPredicate(pEven, true, false)); - Map<Boolean,SizedStreamable<Integer>> result = iteratorToStatefulSink(countTo(10).iterator(), grouping.sink()); + Map<Boolean,Streamable<Integer>> result = iteratorToStatefulSink(countTo(10).iterator(), grouping.sink()); assertEquals(2, result.keySet().size()); - for(SizedStreamable<Integer> group : result.values()) { - assertEquals(5, group.size()); + for(Streamable<Integer> group : result.values()) { + int count = 0; + Stream<Integer> stream = group.stream(); + Iterator<Integer> it = stream.iterator(); + while (it.hasNext()) { + it.next(); + ++count; + } + assertEquals(5, count); } } - @Test(dataProvider = "opArrays") + @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class) public void testOps(String name, Integer[] data) { assertConsistentOpBehavior(data, new GroupByOp<>(mId)); assertConsistentOpBehavior(data, new GroupByOp<>(mZero));
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/MapOpTest.java Mon Aug 20 16:10:37 2012 -0400 +++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/MapOpTest.java Sat Aug 25 19:03:55 2012 -0400 @@ -24,6 +24,7 @@ */ package org.openjdk.tests.java.util.streams.ops; +import org.openjdk.tests.java.util.streams.StreamTestDataProvider; import org.testng.annotations.Test; import java.util.streams.ops.FilterOp; @@ -48,7 +49,7 @@ assertCountSum(MapOp.iterator(MapOp.iterator(countTo(10).iterator(), mDoubler), mDoubler), 10, 220); } - @Test(dataProvider = "opArrays") + @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class) public void testOps(String name, Integer[] data) { assertConsistentOpBehavior(data, new MapOp<>(mId), l -> l); assertConsistentOpBehavior(data, new MapOp<>(mZero), l -> l);
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/SortedOpTest.java Mon Aug 20 16:10:37 2012 -0400 +++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/SortedOpTest.java Sat Aug 25 19:03:55 2012 -0400 @@ -24,12 +24,10 @@ */ package org.openjdk.tests.java.util.streams.ops; +import org.openjdk.tests.java.util.streams.StreamTestDataProvider; import org.testng.annotations.Test; import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.streams.SequentialPipeline; import java.util.streams.ops.SortedOp; import static org.openjdk.tests.java.util.LambdaTestHelpers.*; @@ -47,10 +45,10 @@ assertCountSum(SortedOp.iterator(countTo(10).iterator(), cInteger.reverse()), 10, 55); } - @Test(dataProvider = "opArrays") + @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class) public void testOps(String name, Integer[] data) { SortedOp<Integer> op = new SortedOp<>(cInteger); - assertSorted(new SequentialPipeline<>(Arrays.asList(data), op).into(new ArrayList<Integer>()).iterator()); + assertSorted(seq(data, op).into(new ArrayList<Integer>()).iterator()); assertConsistentOpBehavior(data, op, l->l); assertConsistentOpBehavior(data, new SortedOp<>(cInteger.reverse()), l -> l); }
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/StreamOpTestCase.java Mon Aug 20 16:10:37 2012 -0400 +++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/StreamOpTestCase.java Sat Aug 25 19:03:55 2012 -0400 @@ -30,6 +30,10 @@ import org.testng.annotations.Test; import java.util.*; +import java.util.concurrent.ForkJoinTask; +import java.util.concurrent.ForkJoinUtils; +import java.util.functions.BiPredicate; +import java.util.functions.Factory; import java.util.functions.Mapper; import java.util.functions.Sink; import java.util.streams.*; @@ -47,186 +51,205 @@ @Test public abstract class StreamOpTestCase { + protected<T> Stream<T> seq(T[] data) { + return Streams.stream(Arrays.iterator(data), data.length); + } + + protected<T> Stream<T> par(T[] data) { + return Streams.parallel(Arrays.spliterator(data), data.length); + } + + protected<T, U> Stream<U> seq(T[] data, IntermediateOp<T,U> op) { + return ((LinearPipeline) Streams.stream(Arrays.iterator(data), data.length)).pipeline(op); + } + + protected<T, U> Stream<T> par(T[] data, IntermediateOp<T,U> op) { + return ((LinearPipeline) Streams.parallel(Arrays.spliterator(data), data.length)).pipeline(op); + } + + protected<T, U> U seq(T[] data, TerminalOp<T,U> op) { + return ((LinearPipeline<?, T>) Streams.stream(Arrays.iterator(data), data.length)).pipeline(op); + } + + protected<T, U> U par(T[] data, TerminalOp<T,U> op) { + return ((LinearPipeline<?, T>) Streams.parallel(Arrays.spliterator(data), data.length)).pipeline(op); + } + + protected<T> Iterator<T> iterator(T[] data) { + return Arrays.iterator(data); + } + + protected<T> void forEach(T[] data, Sink<? super T> sink) { + for (T t : data) + sink.accept(t); + } + protected <T, U> void assertConsistentOpBehavior(T[] data, - ElementwiseOp<T, U> op, - Mapper<Integer, Integer> lengthMapper) { - Streamable<T> seq = Arrays.asList(data); - ParallelStreamable<T> par = Arrays.parallel(data); - int length = data.length; - if (lengthMapper != null) { - length = lengthMapper.map(data.length); - } + IntermediateOp<T, U> op, + Mapper<Integer, Integer> lengthMapper) { - ArraySink<U> sink1 = new ArraySink<>(length); - ArraySink<U> sink2 = new ArraySink<>(length); - ArraySink<U> sink3 = new ArraySink<>(length); - ArraySink<U> sink4 = new ArraySink<>(length); - ArraySink<U> sink5 = new ArraySink<>(length); - ArraySink<U> sink6 = new ArraySink<>(length); + int length = (lengthMapper == null) ? 100 : lengthMapper.map(data.length); + + ArraySink<U> refResult = new ArraySink<>(length); // First pass -- grab an iterator and wrap it - Iterator<U> it = op.iterator(seq.iterator()); + Iterator<U> it = op.iterator(iterator(data)); while (it.hasNext()) { - sink1.accept(it.next()); + refResult.accept(it.next()); } // If a length mapper has been specified, make sure it is right if (lengthMapper != null) { - Assert.assertEquals(sink1.size(), (int)lengthMapper.map(data.length)); + Assert.assertEquals(refResult.size(), (int)lengthMapper.map(data.length)); } // Second pass -- create a sink and wrap it { + ArraySink<U> sink2 = new ArraySink<>(length); Sink<T> wrapped = op.sink(sink2); if (wrapped instanceof StatefulSink) { StatefulSink<T, ?> stateful = (StatefulSink<T, ?>)wrapped; stateful.begin(-1); - seq.forEach(stateful); + forEach(data, stateful); stateful.end(); } else { - seq.forEach(wrapped); + forEach(data, wrapped); } - assertEquals(sink1, sink2); + assertEquals(refResult, sink2); } // Third pass -- wrap with SequentialPipeline.op, and iterate in push mode - new SequentialPipeline<>(seq, op).forEach(sink3); - assertEquals(sink1, sink3); + { + ArraySink<U> sink3 = new ArraySink<>(length); + seq(data, op).forEach(sink3); + assertEquals(refResult, sink3); + } // Wrap with SequentialPipeline.op, and iterate in pull mode - Iterator<U> seqIter = new SequentialPipeline<>(seq, op); - while (seqIter.hasNext()) { - sink4.accept(seqIter.next()); + { + ArraySink<U> sink4 = new ArraySink<>(length); + Iterator<U> seqIter = seq(data, op).iterator(); + while (seqIter.hasNext()) { + sink4.accept(seqIter.next()); + } + assertEquals(refResult, sink4); } - assertEquals(sink1, sink4); // Wrap with SequentialPipeline.op, and iterate in mixed mode - Stream<U> mixedIter = new SequentialPipeline<>(seq, op); - if (mixedIter.hasNext()) { - sink5.accept(mixedIter.next()); + { + ArraySink<U> sink5 = new ArraySink<>(length); + Stream<U> stream = seq(data, op); + Iterator<U> iter = stream.iterator(); + if (iter.hasNext()) { + sink5.accept(iter.next()); + } + stream.forEach(sink5); + assertEquals(refResult, sink5); } - mixedIter.forEach(sink5); - assertEquals(sink1, sink5); + + // For stateful ops, validate Node contents with fake ParallelHelper + { + if (op.isStateful()) { + StatefulOp<T, U, ?> sop = (StatefulOp<T, U, ?>) op; + ArraySink<U> sink6a = new ArraySink<>(length); + TreeUtils.Node<U> result = sop.computeParallel(new DummyParallelOpHelper<>(data)); + for (U u : result) + sink6a.accept(u); + assertEquals(refResult, sink6a); + } + } // Wrap with ParallelPipeline.op.sequential - if (op instanceof StatelessOp) { - new ParallelPipeline<>(par, (StatelessOp<T, U>)op).sequential().forEach(sink6); - } else if (op instanceof StatefulOp) { - SizedStreamable<U> sequential = ParallelPipeline.wrap(par).pipeline((StatefulOp<T, U, ?>)op).sequential(); - sequential.forEach(sink6); - } else { - fail("Op neither stateful nor stateless"); + { + ArraySink<U> sink6 = new ArraySink<>(length); + ((LinearPipeline) par(data, op)).sequential().forEach(sink6); + assertEquals(refResult, sink6); } - assertEquals(sink1, sink6); + + // Wrap with ParallelPipeline.op.toArray + { + ArraySink<U> sink7 = new ArraySink<>(length); + Object[] array = par(data, op).toArray(); + for (Object t : array) + sink7.accept((U) t); + assertEquals(refResult, sink7); + } + + // Wrap with ParallelPipeline.op.into + { + ArraySink<U> sink8 = new ArraySink<>(length); + ArrayList list = (ArrayList) ((LinearPipeline) par(data, op)).sequential().into(new ArrayList()); + for (Object u : list) + sink8.accept((U) u); + assertEquals(refResult, sink8); + } // @@@ Extend parallel testing to include // - More ways to get a PSS: Arrays.asParallel, Arrays.asList().parallel(), new ArrayList().parallel() - // - More ways to iterate the PSS: .sequential().into(), iterate result of op + // - More ways to iterate the PSS: iterate result of op + // Extends testing to test whether computation happens in- or out-of-thread } - protected <T, U> void assertConsistentOpBehavior(T[] data, StatelessOp<T, U> op) { + protected <T, U> void assertConsistentOpBehavior(T[] data, IntermediateOp<T, U> op) { assertConsistentOpBehavior(data, op, null); } - protected <T, U> void assertConsistentOpBehavior(T[] data, EagerOp<T, U> op) { - SizedStreamable<T> seq = Arrays.asList(data); - ParallelStreamable<T> par = Arrays.parallel(data); - int length = data.length; + protected <T, U> void assertConsistentOpBehavior(T[] data, TerminalOp<T, U> op) { + assertConsistentOpBehavior(data, op, (u, v) -> u.equals(v)); + } + + protected <T, U> void assertConsistentOpBehavior(T[] data, ShortCircuitTerminalOp<T, U> op) { + // First pass -- wrap Iterator + U answer1 = op.evaluate(iterator(data)); + + // Second pass -- wrap with SequentialPipeline.op + Assert.assertEquals(answer1, seq(data, op)); + + // Fourth pass -- wrap with ParallelPipeline.op + Assert.assertEquals(answer1, par(data, op)); + } + + // @@@ Merge these two into a single method + + protected <T, U> void assertConsistentOpBehavior(T[] data, TerminalOp<T, U> op, BiPredicate<U,U> equalator) { // First pass -- create a sink and evaluate, with no size advice StatefulSink<T, U> sink = op.sink(); sink.begin(-1); - seq.forEach(sink); + forEach(data, sink); U answer1 = sink.end(); - // Second pass -- create a sink and evaluate, with no size advice + // Second pass -- create a sink and evaluate, with size advice StatefulSink<T, U> sink2 = op.sink(); - sink2.begin(seq.size()); - seq.forEach(sink2); + sink2.begin(data.length); + forEach(data, sink2); U answer2 = sink2.end(); - Assert.assertEquals(answer1, answer2); + Assert.assertTrue(equalator.eval(answer1, answer2)); // Third pass -- wrap with SequentialPipeline.op - U answer3 = SequentialPipeline.wrap(seq).pipeline(op); - Assert.assertEquals(answer1, answer3); + U answer3 = seq(data, op); + Assert.assertTrue(equalator.eval(answer1, answer3)); // Fourth pass -- wrap with ParallelPipeline.op - U answer4 = ParallelPipeline.wrap(par).pipeline(op); - Assert.assertEquals(answer1, answer4); - } - - protected <T, U> void assertConsistentOpBehavior(T[] data, ShortCircuitEagerOp<T, U> op) { - SizedStreamable<T> seq = Arrays.asList(data); - ParallelStreamable<T> par = Arrays.parallel(data); - int length = data.length; - - // First pass -- wrap Iterator - U answer1 = op.evaluate(seq.iterator()); - - // Second pass -- wrap with SequentialPipeline.op - U answer2 = SequentialPipeline.wrap(seq).pipeline(op); - Assert.assertEquals(answer1, answer2); - - // Third pass -- wrap with ParallelPipeline.op - U answer3 = ParallelPipeline.wrap(par).pipeline(op); - Assert.assertEquals(answer1, answer3); + U answer4 = par(data, op); + Assert.assertTrue(equalator.eval(answer1, answer4)); } private static <U> void assertEquals(ArraySink<U> sink1, ArraySink<U> sink2) { - Assert.assertEquals(sink1.size(), sink2.size()); - Iterator<U> it1 = sink1.iterator(); - Iterator<U> it2 = sink2.iterator(); - while (it1.hasNext()) { - assertTrue(it2.hasNext()); - Assert.assertEquals(it1.next(), it2.next()); + try { + Assert.assertEquals(sink1.size(), sink2.size()); + Iterator<U> it1 = sink1.iterator(); + Iterator<U> it2 = sink2.iterator(); + while (it1.hasNext()) { + assertTrue(it2.hasNext()); + Assert.assertEquals(it1.next(), it2.next()); + } + assertFalse(it2.hasNext()); } - assertFalse(it2.hasNext()); - } - - private static final Integer[] to0 = new Integer[0]; - private static final Integer[] to1 = new Integer[1]; - private static final Integer[] to10 = new Integer[10]; - private static final Integer[] to100 = new Integer[100]; - private static final Integer[] to1000 = new Integer[1000]; - private static final Integer[] reversed = new Integer[100]; - private static final Integer[] ones = new Integer[100]; - private static final Integer[] twice = new Integer[200]; - private static final Integer[] pseudoRandom; - - static { - Integer[][] arrays = {to0, to1, to10, to100, to1000}; - for (Integer[] arr : arrays) { - for (int i = 0; i < arr.length; i++) { - arr[i] = i; - } + catch (AssertionError e) { + System.out.printf("Expected %s, found %s%n", sink1, sink2); + throw e; } - for (int i = 0; i < reversed.length; i++) { - reversed[i] = reversed.length - i; - } - for (int i = 0; i < ones.length; i++) { - ones[i] = 1; - } - System.arraycopy(to100, 0, twice, 0, to100.length); - System.arraycopy(to100, 0, twice, to100.length, to100.length); - pseudoRandom = new Integer[LambdaTestHelpers.LONG_STRING.length()]; - for (int i = 0; i < LambdaTestHelpers.LONG_STRING.length(); i++) { - pseudoRandom[i] = (int)LambdaTestHelpers.LONG_STRING.charAt(i); - } - } - - // Return an array of Integer[] - @DataProvider(name = "opArrays") - public static Object[][] makeOpArrays() { - return new Object[][]{ - {"to zero", to0}, - {"to one", to1}, - {"to ten", to10}, - {"to 100", to100}, - {"to 1000", to1000}, - {"100 ones", ones}, - {"to 100 twice", twice}, - {"reversed", reversed}, - {"pseudorandom", pseudoRandom}}; } private static class ArraySink<T> implements Sink<T>, Traversable<T>, Sized { @@ -237,7 +260,7 @@ @SuppressWarnings("unchecked") public ArraySink(int initialSize) { - array = (T[])new Object[initialSize]; + array = (T[]) new Object[initialSize]; } public ArraySink() { @@ -253,7 +276,7 @@ @Override public Iterator<T> iterator() { - return Arrays.iterator(array); + return Arrays.iterator(array, 0, offset); } @Override @@ -271,5 +294,43 @@ public int size() { return offset; } + + @Override + public String toString() { + return String.format("ArraySink[%d](%s)", offset, Arrays.toString(array)); + } + } + + private static class DummyParallelOpHelper<T> implements ParallelOp.ParallelOpHelper<T,T> { + private final T[] data; + + public DummyParallelOpHelper(T[] data) { + this.data = data; + } + + @Override + public int suggestDepth() { + return ForkJoinUtils.suggestDepth(data.length); + } + + @Override + public Spliterator spliterator() { + return Arrays.spliterator(data); + } + + @Override + public Sink sink(Sink sink) { + return sink; + } + + @Override + public Iterator iterator() { + return Arrays.iterator(data); + } + + @Override + public Object invoke(ForkJoinTask task) { + return ForkJoinUtils.defaultFJPool().invoke(task); + } } }
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/ToArrayOpTest.java Mon Aug 20 16:10:37 2012 -0400 +++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/ToArrayOpTest.java Sat Aug 25 19:03:55 2012 -0400 @@ -24,6 +24,8 @@ */ package org.openjdk.tests.java.util.streams.ops; +import org.openjdk.tests.java.util.LambdaTestHelpers; +import org.openjdk.tests.java.util.streams.StreamTestDataProvider; import org.testng.annotations.Test; import java.util.streams.ops.ToArrayOp; @@ -41,15 +43,14 @@ public class ToArrayOpTest extends StreamOpTestCase { public void testRawIterator() { ToArrayOp<Integer> op = ToArrayOp.singleton(); - assertCountSum((List<Integer>) (List) Arrays.asList( - iteratorToStatefulSink(countTo(0).iterator(), op.sink())), 0, 0); - assertCountSum((List<Integer>) (List) Arrays.asList( - iteratorToStatefulSink(countTo(10).iterator(), op.sink())), 10, 55); + Object[] result0 = iteratorToStatefulSink(countTo(0).iterator(), op.sink()); + Object[] result10 = iteratorToStatefulSink(countTo(10).iterator(), op.sink()); + assertCountSum(Arrays.asList(result0), 0, 0); + assertCountSum(Arrays.asList(result10), 10, 55); } - @Test(dataProvider = "opArrays") + @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class) public void testOps(String name, Integer[] data) { - ToArrayOp<Integer> op = ToArrayOp.singleton(); - assertConsistentOpBehavior(data, op); + assertConsistentOpBehavior(data, ToArrayOp.<Integer>singleton(), Arrays::equals); } }
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/UniqOpTest.java Mon Aug 20 16:10:37 2012 -0400 +++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/UniqOpTest.java Sat Aug 25 19:03:55 2012 -0400 @@ -24,12 +24,10 @@ */ package org.openjdk.tests.java.util.streams.ops; +import org.openjdk.tests.java.util.streams.StreamTestDataProvider; import org.testng.annotations.Test; import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.streams.SequentialPipeline; import java.util.streams.ops.UniqOp; import static org.openjdk.tests.java.util.LambdaTestHelpers.*; @@ -48,10 +46,11 @@ assertCountSum(UniqOp.<Integer>singleton().iterator(countTo(10).iterator()), 10, 55); } - @Test(dataProvider = "opArrays") + @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class) public void testOps(String name, Integer[] data) { - UniqOp<Integer> op = UniqOp.<Integer>singleton(); - assertUnique(new SequentialPipeline<>(Arrays.asList(data), op).into(new ArrayList<Integer>()).iterator()); + UniqOp<Integer> op = UniqOp.singleton(); + ArrayList<Integer> result = seq(data, op).into(new ArrayList<Integer>()); + assertUnique(result.iterator()); assertConsistentOpBehavior(data, op, null); } }
--- a/test-ng/tests/org/openjdk/tests/javac/LambdaTranslationTest1.java Mon Aug 20 16:10:37 2012 -0400 +++ b/test-ng/tests/org/openjdk/tests/javac/LambdaTranslationTest1.java Sat Aug 25 19:03:55 2012 -0400 @@ -116,7 +116,7 @@ b7b.accept("implicit this:"); assertResult("b9: implicit this: instance:flew"); - Sink<Object> b10 = t -> {setResult(String.format("b10: new Thing: %s", (new LT1Thing(t)).str));}; + Sink<Object> b10 = t -> {setResult(String.format("b10: new LT1Thing: %s", (new LT1Thing(t)).str));}; b10.accept("thing"); assertResult("b10: new LT1Thing: thing");
--- a/test-ng/tests/org/openjdk/tests/javac/MethodReferenceTestInstanceMethod.java Mon Aug 20 16:10:37 2012 -0400 +++ b/test-ng/tests/org/openjdk/tests/javac/MethodReferenceTestInstanceMethod.java Sat Aug 25 19:03:55 2012 -0400 @@ -37,7 +37,8 @@ public class MethodReferenceTestInstanceMethod { public Stream<String> generate() { return Arrays.asList("one", "two", "three", "four", "five", "six") - .filter(s -> s.length() > 3) + .stream() + .filter(s->s.length() > 3) .map(s -> s.toUpperCase()); }
--- a/test-ng/tests/org/openjdk/tests/separate/SourceModel.java Mon Aug 20 16:10:37 2012 -0400 +++ b/test-ng/tests/org/openjdk/tests/separate/SourceModel.java Sat Aug 25 19:03:55 2012 -0400 @@ -190,20 +190,20 @@ protected void generateName(PrintWriter pw) { pw.print(this.name); - pw.print(this.parameters - .map(x -> x.toString()) - .into(new StringJoiner(",", "<", ">").setEmptyOutput(""))); + pw.print(this.parameters.stream() + .map(x->x.toString()) + .into(new StringJoiner(",", "<", ">").setEmptyOutput(""))); pw.print(" "); } protected void generateBody(PrintWriter pw, String superSpec) { - pw.print(this.supertypes - .map(x -> x.toString()) + pw.print(this.supertypes.stream() + .map(x->x.toString()) .into(new StringJoiner(",", superSpec + " ", " ") .setEmptyOutput(""))); pw.println("{ "); - pw.print(this.methods - .map(x -> x.toString()) + pw.print(this.methods.stream() + .map(x->x.toString()) .into(new StringJoiner(", ", "\n ", "\n") .setEmptyOutput(""))); pw.println("}"); @@ -228,11 +228,13 @@ if (superclass != null) { dependencies.put(superclass.getName(), superclass); } - getSupertypes().map(x -> x.getType()) + getSupertypes().stream() + .map(x->x.getType()) .mapped(x -> x.getName()) - .swap().into(dependencies); + .swap() + .into(dependencies); // Do these last so that they override - this.typeDependencies.mapped(x -> x.getName()) + this.typeDependencies.stream().mapped(x->x.getName()) .swap().into(dependencies); return dependencies.values(); } @@ -378,8 +380,8 @@ public void generate(PrintWriter pw) { pw.print(supertype.getName()); - pw.print(getArguments() - .map(x -> x.toString()) + pw.print(getArguments().stream() + .map(x->x.toString()) .into(new StringJoiner(",", "<", ">").setEmptyOutput(""))); } } @@ -412,8 +414,8 @@ protected void generateDecl(PrintWriter pw) { pw.printf("%s %s(", returnType, name); - pw.print(parameters - .map(x -> x.toString()) + pw.print(parameters.stream() + .map(x->x.toString()) .into(new StringJoiner(","))); pw.print(")"); }
--- a/test-ng/tests/org/openjdk/tests/separate/TestHarness.java Mon Aug 20 16:10:37 2012 -0400 +++ b/test-ng/tests/org/openjdk/tests/separate/TestHarness.java Sat Aug 25 19:03:55 2012 -0400 @@ -25,18 +25,16 @@ package org.openjdk.tests.separate; +import org.testng.ITestResult; +import org.testng.annotations.AfterMethod; + import java.lang.reflect.InvocationTargetException; -import java.util.*; -import java.io.*; +import java.util.Arrays; +import java.util.HashSet; +import java.util.StringJoiner; -import org.testng.ITestResult; -import org.testng.annotations.Test; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.AfterSuite; - +import static org.openjdk.tests.separate.SourceModel.Class; import static org.openjdk.tests.separate.SourceModel.*; -import static org.openjdk.tests.separate.SourceModel.Class; import static org.testng.Assert.*; public class TestHarness { @@ -124,7 +122,7 @@ Class stub = new Class(specimen.getName(), cm); String params = - Arrays.asList(args).into(new StringJoiner(", ")).toString(); + Arrays.asList(args).stream().into(new StringJoiner(", ")).toString(); StaticMethod sm = new StaticMethod( method.getReturnType(), method.getName(), @@ -154,7 +152,7 @@ Class cstub = new Class(specimen.getName()); String params = - Arrays.asList(args).into(new StringJoiner(", ")).toString(); + Arrays.asList(args).stream().into(new StringJoiner(", ")).toString(); StaticMethod sm = StaticMethod.returns( String.format("((%s)(new %s())).%s(%s)", iface.toString(),
--- a/test-ng/tests/org/openjdk/tests/vm/FDSeparateCompilationTest.java Mon Aug 20 16:10:37 2012 -0400 +++ b/test-ng/tests/org/openjdk/tests/vm/FDSeparateCompilationTest.java Sat Aug 25 19:03:55 2012 -0400 @@ -25,10 +25,7 @@ package org.openjdk.tests.vm; -import java.lang.reflect.*; import java.util.*; -import java.io.File; -import java.io.IOException; import org.testng.ITestResult; import org.testng.annotations.Test; @@ -62,8 +59,8 @@ ArrayList<Object[]> allCases = new ArrayList<>(); HierarchyGenerator hg = new HierarchyGenerator(); - hg.getOK().map(x -> new Object[] { x }).into(allCases); - hg.getErr().map(x -> new Object[] { x }).into(allCases); + hg.getOK().stream().map(x->new Object[]{x}).into(allCases); + hg.getErr().stream().map(x -> new Object[] { x }).into(allCases); return allCases.toArray(new Object[0][]); }