OpenJDK / lambda / lambda / jdk
changeset 5880:76ff1fc6f813 it2-bootstrap
Combine various XxxSink classes into two; move Sink out of public API
line wrap: on
line diff
--- a/src/share/classes/java/lang/CharSequence.java Thu Aug 30 11:42:29 2012 -0400 +++ b/src/share/classes/java/lang/CharSequence.java Thu Aug 30 19:40:53 2012 -0400 @@ -28,7 +28,7 @@ import java.util.Iterator; import java.util.NoSuchElementException; import java.util.Traversable; -import java.util.functions.Sink; +import java.util.functions.Block; import java.util.streams.Stream; import java.util.streams.Streams; @@ -152,9 +152,9 @@ } @Override - public void forEach(Sink<? super Character> sink) { + public void forEach(Block<? super Character> block) { for (int i = 0; i < cs.length(); i++) - sink.accept(cs.charAt(i)); + block.apply(cs.charAt(i)); } public Iterator<Character> iterator() { @@ -183,11 +183,11 @@ } @Override - public void forEach(Sink<? super Integer> sink) { + public void forEach(Block<? super Integer> block) { for (int i = 0; i < cs.length(); ) { int cp = Character.codePointAt(cs, i); i += Character.charCount(cp); - sink.accept(cp); + block.apply(cp); } }
--- a/src/share/classes/java/util/Arrays.java Thu Aug 30 11:42:29 2012 -0400 +++ b/src/share/classes/java/util/Arrays.java Thu Aug 30 19:40:53 2012 -0400 @@ -3757,9 +3757,9 @@ } @Override - public void forEach(Sink<? super T> sink) { + public void forEach(Block<? super T> block) { while (curIndex < endIndex) { - sink.accept(array[curIndex++]); + block.apply(array[curIndex++]); } } @@ -3822,7 +3822,7 @@ } } - private static class ArraySink<T> implements Sink<T> { + private static class ArraySink<T> implements Sink.OfLinear<T> { private final T[] array; private final int offset; private final int length; @@ -3906,11 +3906,11 @@ return asStreamable(array, 0, array.length); } - public static<T> Sink<T> sink(T[] array) { + public static<T> Sink<T, ?, ?> sink(T[] array) { return sink(array, 0, array.length); } - public static<T> Sink<T> sink(T[] array, int offset, int length) { + public static<T> Sink<T, ?, ?> sink(T[] array, int offset, int length) { return new ArraySink<>(array, offset, length); }
--- a/src/share/classes/java/util/Collection.java Thu Aug 30 11:42:29 2012 -0400 +++ b/src/share/classes/java/util/Collection.java Thu Aug 30 19:40:53 2012 -0400 @@ -25,8 +25,8 @@ package java.util; +import java.util.functions.Block; import java.util.functions.Predicate; -import java.util.functions.Sink; import java.util.streams.Stream; import java.util.streams.Streamable; import java.util.streams.Streams; @@ -495,9 +495,9 @@ } @Override - void forEach(Sink<? super E> sink) default { + void forEach(Block<? super E> block) default { for (E e: this) { - sink.accept(e); + block.apply(e); } } @@ -506,6 +506,6 @@ if (stream.isParallel()) stream = stream.sequential(); // @@@ Would use this::add but compiler doens't support that yet. - stream.forEach((E e)-> { add(e); }); + stream.forEach((E e) -> { add(e); }); } }
--- a/src/share/classes/java/util/Spliterator.java Thu Aug 30 11:42:29 2012 -0400 +++ b/src/share/classes/java/util/Spliterator.java Thu Aug 30 19:40:53 2012 -0400 @@ -24,7 +24,7 @@ */ package java.util; -import java.util.functions.Sink; +import java.util.functions.Block; /** * Spliterator @@ -47,7 +47,7 @@ Spliterator<T> split(); /** Process the remaining elements sequentially. */ - void forEach(Sink<? super T> sink); + void forEach(Block<? super T> sink); /** * Return the number of elements remaining to be processed, if the count can
--- a/src/share/classes/java/util/Traversable.java Thu Aug 30 11:42:29 2012 -0400 +++ b/src/share/classes/java/util/Traversable.java Thu Aug 30 19:40:53 2012 -0400 @@ -24,7 +24,7 @@ */ package java.util; -import java.util.functions.Sink; +import java.util.functions.Block; /** * Implementing this interface allows an object to indicate that it provides @@ -38,10 +38,10 @@ /** * Each element of the object will be provided to the specified Sink. * - * @param sink The Sink to which elements will be provided. + * @param block The Sink to which elements will be provided. */ - public void forEach(Sink<? super T> sink) default { + public void forEach(Block<? super T> block) default { for (T t : this) - sink.accept(t); + block.apply(t); } }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/share/classes/java/util/functions/BiBlock.java Thu Aug 30 19:40:53 2012 -0400 @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ +package java.util.functions; + +/** + * BiBlock + * + * @author Brian Goetz + */ +/** + * Performs operations upon an input object which may modify that object and/or + * external state (other objects). + * + * <p>All block implementations are expected to: + * <ul> + * <li>When used for aggregate operations upon many elements blocks + * should not assume that the {@code apply} operation will be called upon + * elements in any specific order.</li> + * </ul> + * + * @param <L> the type of input objects provided to {@code apply}. + * @param <R> the type of input objects provided to {@code apply}. + */ +public interface BiBlock<L, R> { + + /** + * Performs operations upon the provided object which may modify that object + * and/or external state. + * + * @param l an input object + */ + void apply(L l, R r); + + /** + * Returns a Block which performs in sequence the {@code apply} methods of + * multiple Blocks. This Block's {@code apply} method is performed followed + * by the {@code apply} method of the specified Block operation. + * + * @param other an additional Block which will be chained after this Block + * @return a Block which performs in sequence the {@code apply} method of + * this Block and the {@code apply} method of the specified Block operation + */ + public BiBlock<L, R> chain(BiBlock<? super L, ? super R> other) default { + return (l, r) -> { apply(l, r); other.apply(l, r); }; + } +}
--- a/src/share/classes/java/util/functions/BiMapper.java Thu Aug 30 11:42:29 2012 -0400 +++ b/src/share/classes/java/util/functions/BiMapper.java Thu Aug 30 19:40:53 2012 -0400 @@ -55,10 +55,10 @@ * * @param <T> the type of input objects provided to the {@code map} operation. * @param <U> the type of input objects provided to the {@code map} operation. - * @param <V> the type of output objects from {@code map} operation. May be the + * @param <R> the type of output objects from {@code map} operation. May be the * same type as either {@code <T>} or {@code <U>}. */ -public interface BiMapper<T, U, V> { +public interface BiMapper<T, U, R> { /** * Map the provided input objects to an appropriate output object. @@ -67,20 +67,20 @@ * @param u another object to be mapped. * @return the mapped output object. */ - V map(T t, U u); + R map(T t, U u); /** * Combine with another mapper producing a mapper which preforms both * mappings. * - * @param <V> Type of output objects from the combined mapper. May be the + * @param <W> Type of output objects from the combined mapper. May be the * same type as {@code <U>}. * @param after An additional mapping to be applied to the result of this * mapping. * @return A mapper which performs both the original mapping followed by * a second mapping. */ - public <W> BiMapper<T, U, W> compose(Mapper<? super V, ? extends W> after) default { + public <W> BiMapper<T, U, W> compose(Mapper<? super R, ? extends W> after) default { throw new UnsupportedOperationException("Not yet implemented."); } }
--- a/src/share/classes/java/util/functions/BiPredicate.java Thu Aug 30 11:42:29 2012 -0400 +++ b/src/share/classes/java/util/functions/BiPredicate.java Thu Aug 30 19:40:53 2012 -0400 @@ -52,8 +52,8 @@ * elements in any specific order.</li> * </ul> * - * @param <L> the type of input objects provided to {@code eval}. - * @param <R> the type of input objects provided to {@code eval}. + * @param <L> the type of input objects provided to {@code test}. + * @param <R> the type of input objects provided to {@code test}. */ public interface BiPredicate<L,R> {
--- a/src/share/classes/java/util/functions/BiSink.java Thu Aug 30 11:42:29 2012 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,70 +0,0 @@ -/* - * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. Oracle designates this - * particular file as subject to the "Classpath" exception as provided - * by Oracle in the LICENSE file that accompanied this code. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA - * or visit www.oracle.com if you need additional information or have any - * questions. - */ -package java.util.functions; - -import java.util.Mapping; -import java.util.streams.StatefulBiSink; -import java.util.streams.StatefulSink; - -/** - * BiSink - * - * @author Brian Goetz - */ -public interface BiSink<T, U> extends Sink<Mapping<T,U>> { - void accept(T t, U u); - - void accept(Mapping<T,U> mapping) default { - accept(mapping.getKey(), mapping.getValue()); - } - - BiSink<T,U> tee(BiSink<? super T, ? super U> other) default { - return (T t, U u) -> { other.accept(t, u); accept(t, u); }; - } - - @Override - StatefulBiSink<T,U,Void> asStatefulSink() default { - if (this instanceof StatefulSink) - return (StatefulBiSink<T, U, Void>) this; - else { - // @@@ Compiler bug workaround - have to capture a fake outer 'this' - final BiSink<T,U> thiz = this; - return new StatefulBiSink<T, U, Void>() { - @Override - public void begin(int size) { } - - @Override - public Void end() { - return null; - } - - @Override - public void accept(T t, U u) { - thiz.accept(t, u); - } - }; - } - } -}
--- a/src/share/classes/java/util/functions/FlatMapper.java Thu Aug 30 11:42:29 2012 -0400 +++ b/src/share/classes/java/util/functions/FlatMapper.java Thu Aug 30 19:40:53 2012 -0400 @@ -36,5 +36,5 @@ * @param sink Destination for mapped elements. * @param element The element to be mapped. */ - void flatMapInto(Sink<? super R> sink, T element); + void flatMapInto(Block<? super R> sink, T element); }
--- a/src/share/classes/java/util/functions/Sink.java Thu Aug 30 11:42:29 2012 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,70 +0,0 @@ -/* - * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. Oracle designates this - * particular file as subject to the "Classpath" exception as provided - * by Oracle in the LICENSE file that accompanied this code. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA - * or visit www.oracle.com if you need additional information or have any - * questions. - */ -package java.util.functions; - -import java.util.streams.StatefulSink; - -/** - * A receiver of elements. The counterpart to {@code Stream}. - * - * @param <T> The type of elements accepted by this Sink. - * - * @author Brian Goetz - */ -public interface Sink<T> { - /** - * Accept an element. - * - * @param t The element to be accepted. - */ - void accept(T t); - - public Sink<T> tee(Sink<? super T> other) default { - return (T t) -> { other.accept(t); accept(t); }; - } - - public StatefulSink<T, Void> asStatefulSink() default { - if (this instanceof StatefulSink) - return (StatefulSink<T, Void>) this; - else { - // @@@ Compiler bug workaround - have to capture a fake outer 'this' - final Sink<T> thiz = this; - return new StatefulSink<T, Void>() { - @Override - public void begin(int size) { } - - @Override - public Void end() { - return null; - } - - @Override - public void accept(T t) { - thiz.accept(t); - } - }; - } - } -}
--- a/src/share/classes/java/util/streams/AbstractPipeline.java Thu Aug 30 11:42:29 2012 -0400 +++ b/src/share/classes/java/util/streams/AbstractPipeline.java Thu Aug 30 19:40:53 2012 -0400 @@ -27,7 +27,6 @@ import java.util.*; import java.util.concurrent.ForkJoinTask; import java.util.concurrent.ForkJoinUtils; -import java.util.functions.Sink; import java.util.streams.ops.*; /** @@ -149,27 +148,19 @@ sink.begin(-1); // @@@ supply size if known (iterator was null, chain is size-preserving, initial size known) while (chain.hasNext()) sink.accept(chain.next()); - return sink.end(); + sink.end(); + return sink.getAndClearState(); } else { // Push traversal - int nStateful = 0; - StatefulSink[] statefulSinks = new StatefulSink[ops.length]; StatefulSink<U, V> terminalSink = terminal.sink(); Sink sink = terminalSink; - terminalSink.begin(-1); // @@@ supply size if known to terminal stage - for (int i=ops.length-1; i >= 0; i--) { + for (int i=ops.length-1; i >= 0; i--) sink = ops[i].sink(sink); - if (ops[i].isStateful()) { - StatefulSink s = (StatefulSink) sink; - statefulSinks[nStateful++] = s; - s.begin(-1); // @@@ supply size if known to *this* stage - } - } + sink.begin(-1); // @@@ supply size if known source.forEach(sink); - for (int i = nStateful-1; i >= 0; i--) - statefulSinks[i].end(); - return terminalSink.end(); + sink.end(); + return terminalSink.getAndClearState(); } } @@ -190,14 +181,14 @@ } @Override - public Sink<T> sink(Sink<? super U> sink) { - Sink<?> chain = sink; + public Sink<T, ?, ?> sink(Sink sink) { + Sink chain = sink; for (int i = ops.length-1; i >= 0; i--) { IntermediateOp op = ops[i]; chain = op.sink(chain); } - return (Sink<T>) chain; + return chain; } @Override
--- a/src/share/classes/java/util/streams/AbstractSequentialStreamAccessor.java Thu Aug 30 11:42:29 2012 -0400 +++ b/src/share/classes/java/util/streams/AbstractSequentialStreamAccessor.java Thu Aug 30 19:40:53 2012 -0400 @@ -24,9 +24,7 @@ */ package java.util.streams; -import java.util.Iterator; import java.util.Spliterator; -import java.util.functions.Sink; /** * AbstractSequentialStreamAccessor
--- a/src/share/classes/java/util/streams/LinearPipeline.java Thu Aug 30 11:42:29 2012 -0400 +++ b/src/share/classes/java/util/streams/LinearPipeline.java Thu Aug 30 19:40:53 2012 -0400 @@ -90,8 +90,8 @@ } @Override - public void forEach(Sink<? super U> sink) { - pipeline(new ForEachOp<>(sink)); + public void forEach(Block<? super U> sink) { + pipeline(ForEachOp.make(sink)); } @Override
--- a/src/share/classes/java/util/streams/MapPipeline.java Thu Aug 30 11:42:29 2012 -0400 +++ b/src/share/classes/java/util/streams/MapPipeline.java Thu Aug 30 19:40:53 2012 -0400 @@ -68,8 +68,8 @@ } @Override - public void forEach(BiSink<K, V> sink) { - pipeline(new ForEachOp<>(sink)); + public void forEach(BiBlock<K, V> sink) { + pipeline(ForEachOp.make(sink)); } public<U> U pipeline(TerminalOp<Mapping<K,V>, U> op) {
--- a/src/share/classes/java/util/streams/MapStream.java Thu Aug 30 11:42:29 2012 -0400 +++ b/src/share/classes/java/util/streams/MapStream.java Thu Aug 30 19:40:53 2012 -0400 @@ -119,7 +119,7 @@ * * @param sink the Sink via which all elements will be processed. */ - void forEach(BiSink<K, V> sink); + void forEach(BiBlock<K, V> sink); // @@@ Map, or MapFillable? <A extends Map<K, V>> A into(A target);
--- a/src/share/classes/java/util/streams/MapStreamAccessor.java Thu Aug 30 11:42:29 2012 -0400 +++ b/src/share/classes/java/util/streams/MapStreamAccessor.java Thu Aug 30 19:40:53 2012 -0400 @@ -26,7 +26,7 @@ import java.util.MapIterator; import java.util.Mapping; -import java.util.functions.BiSink; +import java.util.functions.BiBlock; /** * MapStreamAccessor @@ -34,7 +34,7 @@ * @author Brian Goetz */ public interface MapStreamAccessor<K, V> extends StreamAccessor<Mapping<K,V>>, MapIterator<K,V> { - void forEach(BiSink<? super K, ? super V> sink); + void forEach(BiBlock<? super K, ? super V> sink); Stream.Shape getShape() default { return Stream.Shape.KEY_VALUE; } }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/share/classes/java/util/streams/Sink.java Thu Aug 30 19:40:53 2012 -0400 @@ -0,0 +1,117 @@ +/* + * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ +package java.util.streams; + +import java.util.Mapping; +import java.util.functions.Block; + +/** + * InternalSink + * + * @author Brian Goetz + */ +public interface Sink<T, K, V> extends Block<T> { + void accept(T t); + + void accept(K k, V v) default { throw new IllegalStateException(); } + + /** + * Reset the sink state to receive a fresh data set. This is used when a + * Sink is being reused by multiple calculations. + * @param size The approximate size of the data to be pushed downstream, if + * known or -1 if size is unknown. + */ + void begin(int size); + + /** + * Indicate that all element that is going to be pushed has been pushed. + * Chained sinks should dump their contents downstream and clear any stored + * state; terminal sinks should return their terminal state and clear any + * stored state. + * + * @return The terminal state of the sink. + */ + void end(); + + @Override + void apply(T t) default { accept(t); } + + + public interface OfLinear<T> extends Sink<T, Object, Object> { + @Override + void begin(int size) default { } + + @Override + void end() default { } + } + + public static abstract class ChainedLinear<T> implements OfLinear<T> { + protected final Sink downstream; + + public ChainedLinear(Sink downstream) { + this.downstream = downstream; + } + + @Override + public void begin(int size) { + downstream.begin(size); + } + + @Override + public void end() { + downstream.end(); + } + } + + public interface OfMap<K, V> extends Sink<Mapping<K, V>, K, V> { + @Override + void accept(Mapping<K, V> mapping) default { + // @@@ Temporary sanity check -- probably should throw ISE here + System.out.println("Someone called accept(Mapping)"); + accept(mapping.getKey(), mapping.getValue()); + } + + @Override + void accept(K k, V v); + } + + public static abstract class ChainedMap<K,V> implements OfMap<K,V> { + protected final Sink downstream; + + public ChainedMap(Sink downstream) { + this.downstream = downstream; + } + + @Override + public void begin(int size) { + downstream.begin(size); + } + + @Override + public void end() { + downstream.end(); + } + } +}
--- a/src/share/classes/java/util/streams/StatefulBiSink.java Thu Aug 30 11:42:29 2012 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,36 +0,0 @@ -/* - * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. Oracle designates this - * particular file as subject to the "Classpath" exception as provided - * by Oracle in the LICENSE file that accompanied this code. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA - * or visit www.oracle.com if you need additional information or have any - * questions. - */ -package java.util.streams; - -import java.util.Mapping; -import java.util.functions.BiSink; - -/** - * StatefulBiSink - * - * @author Brian Goetz - */ -public interface StatefulBiSink<K,V,R> extends BiSink<K,V>, StatefulSink<Mapping<K,V>,R> { -}
--- a/src/share/classes/java/util/streams/StatefulSink.java Thu Aug 30 11:42:29 2012 -0400 +++ b/src/share/classes/java/util/streams/StatefulSink.java Thu Aug 30 19:40:53 2012 -0400 @@ -24,32 +24,23 @@ */ package java.util.streams; -import java.util.functions.Sink; - /** - * A Sink which accumulates state as elements are accepted. + * A KitchenSink which accumulates state as elements are accepted. * * @param <T> The type of elements to be accepted. - * @param <V> The type of the terminal state. + * @param <R> The type of the terminal state. * * @author Brian Goetz */ -public interface StatefulSink<T, V> extends Sink<T> { +public interface StatefulSink<T, R> extends Sink<T, Object, Object> { /** - * Reset the sink state to receive a fresh data set. This is used when a - * Sink is being reused by multiple calculations. - * @param size The approximate size of the data to be pushed downstream, if - * known or -1 if size is unknown. + * Retrieve, and clear, the current state of the sink. */ - void begin(int size); + R getAndClearState(); - /** - * Indicate that all element that is going to be pushed has been pushed. - * Chained sinks should dump their contents downstream and clear any stored - * state; terminal sinks should return their terminal state and clear any - * stored state. - * - * @return The terminal state of the sink. - */ - V end(); + @Override + void begin(int size) default { } + + @Override + void end() default { } }
--- a/src/share/classes/java/util/streams/Stream.java Thu Aug 30 11:42:29 2012 -0400 +++ b/src/share/classes/java/util/streams/Stream.java Thu Aug 30 19:40:53 2012 -0400 @@ -99,7 +99,7 @@ Stream<T> cumulate(BinaryOperator<T> operator); - void forEach(Sink<? super T> sink); + void forEach(Block<? super T> sink); <A extends Fillable<? super T>> A into(A target);
--- a/src/share/classes/java/util/streams/StreamAccessor.java Thu Aug 30 11:42:29 2012 -0400 +++ b/src/share/classes/java/util/streams/StreamAccessor.java Thu Aug 30 19:40:53 2012 -0400 @@ -26,7 +26,7 @@ import java.util.Iterator; import java.util.Spliterator; -import java.util.functions.Sink; +import java.util.functions.Block; /** * StreamAccessor @@ -36,7 +36,7 @@ * @author Brian Goetz */ public interface StreamAccessor<T> { - void forEach(Sink<? super T> sink); + void forEach(Block<? super T> sink); Iterator<T> iterator();
--- a/src/share/classes/java/util/streams/StreamBuilder.java Thu Aug 30 11:42:29 2012 -0400 +++ b/src/share/classes/java/util/streams/StreamBuilder.java Thu Aug 30 19:40:53 2012 -0400 @@ -25,17 +25,17 @@ package java.util.streams; import java.util.*; -import java.util.functions.Sink; +import java.util.functions.Block; /** * StreamBuilder * * @author Brian Goetz */ -public interface StreamBuilder<T> extends Sized, Streamable<T>, Traversable<T>, Sink<T> /* , Fillable<T> */ { +public interface StreamBuilder<T> extends Sized, Streamable<T>, Traversable<T>, Sink.OfLinear<T> /* , Fillable<T> */ { @Override - void forEach(Sink<? super T> sink); + void forEach(Block<? super T> block); void clear();
--- a/src/share/classes/java/util/streams/StreamBuilders.java Thu Aug 30 11:42:29 2012 -0400 +++ b/src/share/classes/java/util/streams/StreamBuilders.java Thu Aug 30 19:40:53 2012 -0400 @@ -28,7 +28,7 @@ import java.util.Arrays; import java.util.Iterator; import java.util.List; -import java.util.functions.Sink; +import java.util.functions.Block; /** * Utilities for building streams. @@ -91,9 +91,9 @@ } @Override - public void forEach(Sink<? super T> sink) { + public void forEach(Block<? super T> block) { for (int i=0; i<curSize; i++) { - sink.accept(array[i]); + block.apply(array[i]); } } @@ -174,8 +174,8 @@ } @Override - public void forEach(Sink<? super T> sink) { - list.forEach(sink); + public void forEach(Block<? super T> block) { + list.forEach(block); } @Override
--- a/src/share/classes/java/util/streams/Streamable.java Thu Aug 30 11:42:29 2012 -0400 +++ b/src/share/classes/java/util/streams/Streamable.java Thu Aug 30 19:40:53 2012 -0400 @@ -24,10 +24,6 @@ */ package java.util.streams; -import java.util.*; -import java.util.functions.*; -import java.util.streams.ops.*; - /** * Streamable *
--- a/src/share/classes/java/util/streams/Streams.java Thu Aug 30 11:42:29 2012 -0400 +++ b/src/share/classes/java/util/streams/Streams.java Thu Aug 30 19:40:53 2012 -0400 @@ -25,7 +25,7 @@ package java.util.streams; import java.util.*; -import java.util.functions.Sink; +import java.util.functions.Block; /** * Streams @@ -83,7 +83,7 @@ } @Override - public void forEach(Sink<? super T> sink) { } + public void forEach(Block<? super T> block) { } @Override public int getRemainingSizeIfKnown() { @@ -124,9 +124,9 @@ } @Override - public void forEach(Sink<? super T> sink) { + public void forEach(Block<? super T> block) { while (it.hasNext()) - sink.accept(it.next()); + block.apply(it.next()); } @Override @@ -173,8 +173,8 @@ } @Override - public void forEach(Sink<? super T> sink) { - spliterator.forEach(sink); + public void forEach(Block<? super T> block) { + spliterator.forEach(block); } @Override @@ -240,14 +240,14 @@ } @Override - public void forEach(Sink<? super T> sink) { + public void forEach(Block<? super T> block) { if (iterator == null) { - traversable.forEach(sink); + traversable.forEach(block); iterator = Collections.emptyIterator(); } else { while (iterator.hasNext()) - sink.accept(iterator.next()); + block.apply(iterator.next()); } }
--- a/src/share/classes/java/util/streams/ops/BiAllMatchOp.java Thu Aug 30 11:42:29 2012 -0400 +++ b/src/share/classes/java/util/streams/ops/BiAllMatchOp.java Thu Aug 30 19:40:53 2012 -0400 @@ -47,7 +47,7 @@ @Override public Boolean evaluate(Iterator<Mapping<K,V>> iterator) { - return allMatch(new MapIterator.IteratorAdapter(iterator), predicate); + return allMatch(new MapIterator.IteratorAdapter<>(iterator), predicate); } public static <K,V> boolean allMatch(MapIterator<K,V> iterator, BiPredicate<? super K, ? super V> predicate) {
--- a/src/share/classes/java/util/streams/ops/BiAnyMatchOp.java Thu Aug 30 11:42:29 2012 -0400 +++ b/src/share/classes/java/util/streams/ops/BiAnyMatchOp.java Thu Aug 30 19:40:53 2012 -0400 @@ -47,7 +47,7 @@ @Override public Boolean evaluate(Iterator<Mapping<K,V>> iterator) { - return anyMatch(new MapIterator.IteratorAdapter(iterator), predicate); + return anyMatch(new MapIterator.IteratorAdapter<>(iterator), predicate); } public static <K,V> boolean anyMatch(MapIterator<K,V> iterator, BiPredicate<? super K, ? super V> predicate) {
--- a/src/share/classes/java/util/streams/ops/BiFilterOp.java Thu Aug 30 11:42:29 2012 -0400 +++ b/src/share/classes/java/util/streams/ops/BiFilterOp.java Thu Aug 30 19:40:53 2012 -0400 @@ -24,14 +24,9 @@ */ package java.util.streams.ops; -import java.util.Iterator; -import java.util.MapIterator; -import java.util.Mapping; -import java.util.NoSuchElementException; -import java.util.Objects; +import java.util.*; import java.util.functions.BiPredicate; -import java.util.functions.BiSink; -import java.util.functions.Sink; +import java.util.streams.Sink; import java.util.streams.Stream; /** @@ -62,25 +57,14 @@ } @Override - public Sink<Mapping<K,V>> sink(final Sink<? super Mapping<K,V>> sink) { - if (!(sink instanceof BiSink)) { - throw new IllegalArgumentException(String.format("Expected argument sink to be of type BiSync; found %s", sink.getClass())); - } - final BiSink<K, V> biSink = (BiSink<K, V>) (BiSink) Objects.requireNonNull(sink); - - return new BiSink<K,V>() { + public Sink<Mapping<K, V>, K, V> sink(Sink sink) { + return new Sink.ChainedMap<K, V>(sink) { @Override public void accept(K k, V v) { if(predicate.test(k, v)) { - biSink.accept(k, v); + downstream.accept(k, v); } } - - @Override - public void accept(Mapping<K, V> entry) { - System.out.println("Warning: executing push(Entry)"); - accept(entry.getKey(), entry.getValue()); - } }; }
--- a/src/share/classes/java/util/streams/ops/BiMapOp.java Thu Aug 30 11:42:29 2012 -0400 +++ b/src/share/classes/java/util/streams/ops/BiMapOp.java Thu Aug 30 19:40:53 2012 -0400 @@ -29,8 +29,7 @@ import java.util.Mapping; import java.util.Objects; import java.util.functions.BiMapper; -import java.util.functions.BiSink; -import java.util.functions.Sink; +import java.util.streams.Sink; import java.util.streams.Stream; /** @@ -67,27 +66,17 @@ } @Override - public BiSink<K,V> sink(final Sink<? super Mapping<K,U>> sink) { - if (!(sink instanceof BiSink)) { - throw new IllegalStateException("Expecting BiSink"); - } - final BiSink<K, U> biSink = (BiSink<K, U>) (Sink) Objects.requireNonNull(sink); - - return new BiSink<K,V>() { + public Sink<Mapping<K, V>, K, V> sink(Sink sink) { + return new Sink.ChainedMap<K,V>(sink) { @Override public void accept(K k, V v) { - biSink.accept(k, mapper.map(k, v)); - } - - @Override - public void accept(Mapping<K, V> entry) { - System.out.println("Warning: executing push(Entry)"); - accept(entry.getKey(), entry.getValue()); + downstream.accept(k, mapper.map(k, v)); } }; } - public static <K, V, VV> MapIterator<K, VV> iterator(Iterator<Mapping<K, V>> source, BiMapper<? super K, ? super V, ? extends VV> mapper) { + public static <K, V, VV> MapIterator<K, VV> iterator(final Iterator<Mapping<K, V>> source, + final BiMapper<? super K, ? super V, ? extends VV> mapper) { Objects.requireNonNull(source); Objects.requireNonNull(mapper); return new MapIterator<K, VV>() { @@ -100,7 +89,7 @@ @Override public Mapping<K, VV> next() { - Mapping<K, V> next = source.next(); + final Mapping<K, V> next = source.next(); current = new Mapping.AbstractMapping<K,VV>() { @Override public K getKey() {
--- a/src/share/classes/java/util/streams/ops/CumulateOp.java Thu Aug 30 11:42:29 2012 -0400 +++ b/src/share/classes/java/util/streams/ops/CumulateOp.java Thu Aug 30 19:40:53 2012 -0400 @@ -24,11 +24,11 @@ */ package java.util.streams.ops; -import java.util.*; -import java.util.concurrent.ForkJoinTask; +import java.util.Iterator; +import java.util.Objects; +import java.util.Spliterator; import java.util.concurrent.RecursiveTask; import java.util.functions.BinaryOperator; -import java.util.functions.Sink; import java.util.streams.*; /** @@ -51,19 +51,15 @@ } @Override - public StatefulSink<T, T> sink(final Sink<? super T> sink) { + public StatefulSink<T, T> sink(final Sink sink) { return new StatefulSink<T, T>() { - private boolean first = true; + private boolean first; private T state; @Override public void begin(int size) { first = true; - } - - @Override - public T end() { - return state; + sink.begin(size); } @Override @@ -77,6 +73,13 @@ } sink.accept(state); } + + @Override + public T getAndClearState() { + T ret = state; + state = null; + return ret; + } }; } @@ -176,10 +179,12 @@ } else { leafData = StreamBuilders.make(); - StatefulSink<T, T> sink = sink(leafData); + StatefulSink<T, T> terminalSink = sink(leafData); + Sink<V, ?, ?> sink = problem.helper.sink(terminalSink); sink.begin(-1); - source.forEach(problem.helper.sink(sink)); - upward = sink.end(); + source.forEach(sink); + sink.end(); + upward = terminalSink.getAndClearState(); // Special case -- if problem.depth == 0, just wrap the result and be done if (isRoot()) return TreeUtils.node(leafData);
--- a/src/share/classes/java/util/streams/ops/FilterOp.java Thu Aug 30 11:42:29 2012 -0400 +++ b/src/share/classes/java/util/streams/ops/FilterOp.java Thu Aug 30 19:40:53 2012 -0400 @@ -28,7 +28,7 @@ import java.util.NoSuchElementException; import java.util.Objects; import java.util.functions.Predicate; -import java.util.functions.Sink; +import java.util.streams.Sink; import java.util.streams.Stream; /** @@ -56,13 +56,13 @@ } @Override - public Sink<T> sink(final Sink<? super T> sink) { + public Sink<T, ?, ?> sink(final Sink sink) { Objects.requireNonNull(sink); - return new Sink<T>() { + return new Sink.ChainedLinear<T>(sink) { @Override public void accept(T t) { if (predicate.test(t)) - sink.accept(t); + downstream.accept(t); } }; }
--- a/src/share/classes/java/util/streams/ops/FlatMapOp.java Thu Aug 30 11:42:29 2012 -0400 +++ b/src/share/classes/java/util/streams/ops/FlatMapOp.java Thu Aug 30 19:40:53 2012 -0400 @@ -28,7 +28,7 @@ import java.util.NoSuchElementException; import java.util.Objects; import java.util.functions.FlatMapper; -import java.util.functions.Sink; +import java.util.streams.Sink; import java.util.streams.Stream; import java.util.streams.StreamBuilder; import java.util.streams.StreamBuilders; @@ -58,11 +58,11 @@ } @Override - public Sink<T> sink(final Sink<? super R> sink) { + public Sink<T,?,?> sink(final Sink sink) { Objects.requireNonNull(sink); - return new Sink<T>() { + return new Sink.ChainedLinear<T>(sink) { public void accept(T t) { - mapper.flatMapInto(sink, t); + mapper.flatMapInto(downstream, t); } }; }
--- a/src/share/classes/java/util/streams/ops/FoldOp.java Thu Aug 30 11:42:29 2012 -0400 +++ b/src/share/classes/java/util/streams/ops/FoldOp.java Thu Aug 30 19:40:53 2012 -0400 @@ -27,6 +27,7 @@ import java.util.Spliterator; import java.util.concurrent.RecursiveTask; import java.util.functions.*; +import java.util.streams.Sink; import java.util.streams.StatefulSink; /** @@ -61,7 +62,7 @@ } @Override - public U end() { + public U getAndClearState() { try { return state; } @@ -107,10 +108,11 @@ } else { final StatefulSink<T, U> reduceStage = op.sink(); - final Sink<V> chain = helper.sink(reduceStage); - reduceStage.begin(-1); + final Sink<V, ?, ?> chain = helper.sink(reduceStage); + chain.begin(-1); source.forEach(chain); - return reduceStage.end(); + chain.end(); + return reduceStage.getAndClearState(); } } }
--- a/src/share/classes/java/util/streams/ops/ForEachOp.java Thu Aug 30 11:42:29 2012 -0400 +++ b/src/share/classes/java/util/streams/ops/ForEachOp.java Thu Aug 30 19:40:53 2012 -0400 @@ -24,9 +24,12 @@ */ package java.util.streams.ops; +import java.util.Mapping; import java.util.Spliterator; import java.util.concurrent.RecursiveAction; -import java.util.functions.Sink; +import java.util.functions.BiBlock; +import java.util.functions.Block; +import java.util.streams.Sink; import java.util.streams.StatefulSink; /** @@ -35,21 +38,54 @@ * @author Brian Goetz */ public class ForEachOp<T> implements TerminalOp<T,Void> { - private final Sink<? super T> sink; + private final StatefulSink<T,Void> sink; - public ForEachOp(Sink<? super T> sink) { + protected ForEachOp(StatefulSink<T, Void> sink) { this.sink = sink; } + public static<T> ForEachOp<T> make(final Block<? super T> block) { + return new ForEachOp<>(new StatefulSink<T, Void>() { + @Override + public void accept(T t) { + block.apply(t); + } + + @Override + public Void getAndClearState() { + return null; + } + }); + } + + public static<K,V> ForEachOp<Mapping<K,V>> make(final BiBlock<? super K, ? super V> block) { + return new ForEachOp<>(new StatefulSink<Mapping<K,V>, Void>() { + @Override + public void accept(Object k, Object v) { + block.apply((K) k, (V) v); + } + + @Override + public void accept(Mapping<K, V> mapping) { + block.apply(mapping.getKey(), mapping.getValue()); + } + + @Override + public Void getAndClearState() { + return null; + } + }); + } + @Override public StatefulSink<T, Void> sink() { - return (StatefulSink<T, Void>) sink.asStatefulSink(); + return sink; } @Override public <V> Void computeParallel(ParallelOpHelper<T, V> helper) { int depth = helper.suggestDepth(); - Sink<V> compoundSink = helper.sink(sink); + Sink<V, ?, ?> compoundSink = helper.sink(sink); Spliterator<V> spliterator = helper.spliterator(); if (depth == 0) { spliterator.forEach(compoundSink); @@ -62,9 +98,9 @@ private static class ForEachTask<T> extends RecursiveAction { private final int depth; private final Spliterator<T> spliterator; - private final Sink<T> sink; + private final Sink<T, ?, ?> sink; - private ForEachTask(int depth, Spliterator<T> spliterator, Sink<T> sink) { + private ForEachTask(int depth, Spliterator<T> spliterator, Sink<T, ?, ?> sink) { this.depth = depth; this.spliterator = spliterator; this.sink = sink;
--- a/src/share/classes/java/util/streams/ops/GroupByOp.java Thu Aug 30 11:42:29 2012 -0400 +++ b/src/share/classes/java/util/streams/ops/GroupByOp.java Thu Aug 30 19:40:53 2012 -0400 @@ -55,7 +55,7 @@ } @Override - public Map<K, Streamable<T>> end() { + public Map<K, Streamable<T>> getAndClearState() { Map<K, Streamable<T>> result = (Map) map; map = null; return result;
--- a/src/share/classes/java/util/streams/ops/IntermediateOp.java Thu Aug 30 11:42:29 2012 -0400 +++ b/src/share/classes/java/util/streams/ops/IntermediateOp.java Thu Aug 30 19:40:53 2012 -0400 @@ -25,7 +25,7 @@ package java.util.streams.ops; import java.util.Iterator; -import java.util.functions.Sink; +import java.util.streams.Sink; import java.util.streams.Stream; /** @@ -76,11 +76,12 @@ * Return a sink which will accept elements, perform the operation upon * each element and send it to the provided sink. * + * * @param sink elements will be sent to this sink after the processing. * @return a sink which will accept elements and perform the operation upon * each element. */ - Sink<T> sink(Sink<? super U> sink); + Sink<T, ?, ?> sink(Sink sink); Stream.Shape inputShape() default { return Stream.Shape.LINEAR; } Stream.Shape outputShape() default { return Stream.Shape.LINEAR; }
--- a/src/share/classes/java/util/streams/ops/MapExtractKeysOp.java Thu Aug 30 11:42:29 2012 -0400 +++ b/src/share/classes/java/util/streams/ops/MapExtractKeysOp.java Thu Aug 30 19:40:53 2012 -0400 @@ -27,8 +27,7 @@ import java.util.Iterator; import java.util.MapIterator; import java.util.Mapping; -import java.util.functions.BiSink; -import java.util.functions.Sink; +import java.util.streams.Sink; import java.util.streams.Stream; /** @@ -36,7 +35,7 @@ * * @author Brian Goetz */ -public class MapExtractKeysOp<K,V> implements StatelessOp<Mapping<K,V>, K> { +public class MapExtractKeysOp<K, V> implements StatelessOp<Mapping<K, V>, K> { @Override public int getStreamFlags(int upstreamState) { return upstreamState & ~Stream.STATE_UNKNOWN_MASK_V1; @@ -44,7 +43,7 @@ @Override public Iterator<K> iterator(Iterator<Mapping<K, V>> in) { - final MapIterator<K,V> mapIterator = (MapIterator<K, V>) in; + final MapIterator<K, V> mapIterator = (MapIterator<K, V>) in; return new Iterator<K>() { @Override public boolean hasNext() { @@ -59,17 +58,11 @@ } @Override - public BiSink<K, V> sink(final Sink<? super K> sink) { - return new BiSink<K, V>() { + public Sink<Mapping<K, V>, K, V> sink(final Sink sink) { + return new Sink.ChainedMap<K, V>(sink) { @Override public void accept(K k, V v) { - sink.accept(k); - } - - @Override - public void accept(Mapping<K, V> entry) { - System.out.println("Warning: accept(Entry) called"); - sink.accept(entry.getKey()); + downstream.accept(k); } }; }
--- a/src/share/classes/java/util/streams/ops/MapExtractValuesOp.java Thu Aug 30 11:42:29 2012 -0400 +++ b/src/share/classes/java/util/streams/ops/MapExtractValuesOp.java Thu Aug 30 19:40:53 2012 -0400 @@ -27,8 +27,7 @@ import java.util.Iterator; import java.util.MapIterator; import java.util.Mapping; -import java.util.functions.BiSink; -import java.util.functions.Sink; +import java.util.streams.Sink; import java.util.streams.Stream; /** @@ -36,7 +35,7 @@ * * @author Brian Goetz */ -public class MapExtractValuesOp<K,V> implements StatelessOp<Mapping<K,V>, V> { +public class MapExtractValuesOp<K, V> implements StatelessOp<Mapping<K, V>, V> { @Override public int getStreamFlags(int upstreamState) { return upstreamState & ~Stream.STATE_UNKNOWN_MASK_V1; @@ -44,7 +43,7 @@ @Override public Iterator<V> iterator(Iterator<Mapping<K, V>> in) { - final MapIterator<K,V> mapIterator = (MapIterator<K, V>) in; + final MapIterator<K, V> mapIterator = (MapIterator<K, V>) in; return new Iterator<V>() { @Override public boolean hasNext() { @@ -59,17 +58,11 @@ } @Override - public BiSink<K, V> sink(final Sink<? super V> sink) { - return new BiSink<K, V>() { + public Sink<Mapping<K, V>, K, V> sink(final Sink sink) { + return new Sink.ChainedMap<K, V>(sink) { @Override public void accept(K k, V v) { - sink.accept(v); - } - - @Override - public void accept(Mapping<K, V> entry) { - System.out.println("Warning: accept(Entry) called"); - sink.accept(entry.getValue()); + downstream.accept(v); } }; }
--- a/src/share/classes/java/util/streams/ops/MapFilterKeysOp.java Thu Aug 30 11:42:29 2012 -0400 +++ b/src/share/classes/java/util/streams/ops/MapFilterKeysOp.java Thu Aug 30 19:40:53 2012 -0400 @@ -27,9 +27,8 @@ import java.util.Iterator; import java.util.Mapping; import java.util.Objects; -import java.util.functions.BiSink; import java.util.functions.Predicate; -import java.util.functions.Sink; +import java.util.streams.Sink; import java.util.streams.Stream; /** @@ -37,7 +36,7 @@ * * @author Brian Goetz */ -public class MapFilterKeysOp<K, V> implements StatelessOp<Mapping<K,V>, Mapping<K,V>> { +public class MapFilterKeysOp<K, V> implements StatelessOp<Mapping<K, V>, Mapping<K, V>> { private final Predicate<? super K> predicate; public MapFilterKeysOp(Predicate<? super K> predicate) { @@ -55,24 +54,14 @@ } @Override - public BiSink<K,V> sink(final Sink<? super Mapping<K, V>> sink) { - if (!(sink instanceof BiSink)) { - throw new IllegalStateException("Expecting BiSink"); - } - final BiSink<K, V> biSink = (BiSink<K, V>) (Sink) sink; - return new BiSink<K,V>() { + public Sink<Mapping<K, V>, K, V> sink(final Sink sink) { + return new Sink.ChainedMap<K, V>(sink) { @Override public void accept(K k, V v) { if (predicate.test(k)) { - biSink.accept(k, v); + downstream.accept(k, v); } } - - @Override - public void accept(Mapping<K, V> entry) { - System.out.println("Warning: executing push(Entry)"); - accept(entry.getKey(), entry.getValue()); - } }; } }
--- a/src/share/classes/java/util/streams/ops/MapFilterValuesOp.java Thu Aug 30 11:42:29 2012 -0400 +++ b/src/share/classes/java/util/streams/ops/MapFilterValuesOp.java Thu Aug 30 19:40:53 2012 -0400 @@ -27,9 +27,8 @@ import java.util.Iterator; import java.util.Mapping; import java.util.Objects; -import java.util.functions.BiSink; import java.util.functions.Predicate; -import java.util.functions.Sink; +import java.util.streams.Sink; import java.util.streams.Stream; /** @@ -55,24 +54,14 @@ } @Override - public BiSink<K,V> sink(final Sink<? super Mapping<K, V>> sink) { - if (!(sink instanceof BiSink)) { - throw new IllegalStateException("Expecting BiSink"); - } - final BiSink<K, V> biSink = (BiSink<K, V>) (Sink) sink; - return new BiSink<K,V>() { + public Sink<Mapping<K, V>, K, V> sink(Sink sink) { + return new Sink.ChainedMap<K, V>(sink) { @Override public void accept(K k, V v) { if (predicate.test(v)) { - biSink.accept(k, v); + downstream.accept(k, v); } } - - @Override - public void accept(Mapping<K, V> entry) { - System.out.println("Warning: executing push(Entry)"); - accept(entry.getKey(), entry.getValue()); - } }; } }
--- a/src/share/classes/java/util/streams/ops/MapMapValuesOp.java Thu Aug 30 11:42:29 2012 -0400 +++ b/src/share/classes/java/util/streams/ops/MapMapValuesOp.java Thu Aug 30 19:40:53 2012 -0400 @@ -27,9 +27,8 @@ import java.util.Iterator; import java.util.Mapping; import java.util.Objects; -import java.util.functions.BiSink; import java.util.functions.Mapper; -import java.util.functions.Sink; +import java.util.streams.Sink; import java.util.streams.Stream; /** @@ -59,21 +58,11 @@ } @Override - public BiSink<K,V> sink(final Sink<? super Mapping<K, U>> sink) { - if (!(sink instanceof BiSink)) { - throw new IllegalStateException("Expecting BiSink"); - } - final BiSink<K, U> biSink = (BiSink<K, U>) (Sink) sink; - return new BiSink<K,V>() { + public Sink<Mapping<K, V>, K, V> sink(Sink sink) { + return new Sink.ChainedMap<K,V>(sink) { @Override public void accept(K k, V v) { - biSink.accept(k, mapper.map(v)); - } - - @Override - public void accept(Mapping<K, V> entry) { - System.out.println("Warning: executing push(Entry)"); - accept(entry.getKey(), entry.getValue()); + downstream.accept(k, mapper.map(v)); } }; }
--- a/src/share/classes/java/util/streams/ops/MapOp.java Thu Aug 30 11:42:29 2012 -0400 +++ b/src/share/classes/java/util/streams/ops/MapOp.java Thu Aug 30 19:40:53 2012 -0400 @@ -27,7 +27,7 @@ import java.util.Iterator; import java.util.Objects; import java.util.functions.Mapper; -import java.util.functions.Sink; +import java.util.streams.Sink; import java.util.streams.Stream; /** @@ -54,12 +54,11 @@ } @Override - public Sink<T> sink(final Sink<? super R> sink) { - Objects.requireNonNull(sink); - return new Sink<T>() { + public Sink<T, ?, ?> sink(Sink sink) { + return new Sink.ChainedLinear<T>(sink) { @Override public void accept(T t) { - sink.accept(mapper.map(t)); + downstream.accept(mapper.map(t)); } }; }
--- a/src/share/classes/java/util/streams/ops/MapSortedOp.java Thu Aug 30 11:42:29 2012 -0400 +++ b/src/share/classes/java/util/streams/ops/MapSortedOp.java Thu Aug 30 19:40:53 2012 -0400 @@ -25,15 +25,15 @@ package java.util.streams.ops; import java.util.*; -import java.util.functions.Sink; -import java.util.streams.StatefulSink; +import java.util.streams.Sink; import java.util.streams.Stream; /** * An operation which sorts elements. * - * @param <T> Type of elements to be sorted. + * @param <K> Type of keys to be sorted + * @param <V> Values associated with keys to be sorted * */ public class MapSortedOp<K extends Comparable<? super K>,V> implements StatefulOp<Mapping<K,V>,Mapping<K,V>,Void> { @@ -68,8 +68,8 @@ } @Override - public StatefulSink<Mapping<K, V>, Void> sink(Sink<? super Mapping<K, V>> sink) { - return new StatefulSink<Mapping<K,V>, Void>() { + public Sink<Mapping<K, V>, K, V> sink(Sink sink) { + return new Sink.ChainedMap<K, V>(sink) { PriorityQueue<Mapping<K,V>> pq; @Override public void begin(int size) { @@ -77,11 +77,15 @@ } @Override - public Void end() { + public void end() { while (!pq.isEmpty()) { - sink.accept(pq.remove()); + downstream.accept(pq.remove()); } - return null; + } + + @Override + public void accept(K k, V v) { + // @@@ Do something here! } @Override
--- a/src/share/classes/java/util/streams/ops/MapSwapOp.java Thu Aug 30 11:42:29 2012 -0400 +++ b/src/share/classes/java/util/streams/ops/MapSwapOp.java Thu Aug 30 19:40:53 2012 -0400 @@ -26,8 +26,7 @@ import java.util.Iterator; import java.util.Mapping; -import java.util.functions.BiSink; -import java.util.functions.Sink; +import java.util.streams.Sink; import java.util.streams.Stream; /** @@ -78,21 +77,11 @@ } @Override - public BiSink<K, V> sink(final Sink<? super Mapping<V, K>> sink) { - if (!(sink instanceof BiSink)) { - throw new IllegalStateException("Expecting BiSink"); - } - final BiSink<V, K> biSink = (BiSink<V, K>)(Sink)sink; - return new BiSink<K, V>() { + public Sink<Mapping<K, V>, K, V> sink(Sink sink) { + return new Sink.ChainedMap<K,V>(sink) { @Override public void accept(K k, V v) { - biSink.accept(v, k); - } - - @Override - public void accept(Mapping<K, V> entry) { - System.out.println("Warning: executing accept(Entry)"); - accept(entry.getKey(), entry.getValue()); + downstream.accept(v, k); } }; }
--- a/src/share/classes/java/util/streams/ops/MappedOp.java Thu Aug 30 11:42:29 2012 -0400 +++ b/src/share/classes/java/util/streams/ops/MappedOp.java Thu Aug 30 19:40:53 2012 -0400 @@ -25,9 +25,8 @@ package java.util.streams.ops; import java.util.*; -import java.util.functions.BiSink; import java.util.functions.Mapper; -import java.util.functions.Sink; +import java.util.streams.Sink; import java.util.streams.Stream; /** @@ -114,14 +113,11 @@ } @Override - public Sink<T> sink(final Sink<? super Mapping<T, R>> sink) { - if (!(sink instanceof BiSink)) { - throw new ClassCastException("Expecting BiSink"); - } - final BiSink<T, R> biSink = (BiSink<T, R>) (BiSink) sink; - return new Sink<T>() { + public Sink<T,?,?> sink(final Sink sink) { + return new Sink.ChainedLinear<T>(sink) { + @Override public void accept(T t) { - biSink.accept(t, mapper.map(t)); + downstream.accept(t, mapper.map(t)); } }; }
--- a/src/share/classes/java/util/streams/ops/ParallelOp.java Thu Aug 30 11:42:29 2012 -0400 +++ b/src/share/classes/java/util/streams/ops/ParallelOp.java Thu Aug 30 19:40:53 2012 -0400 @@ -27,7 +27,7 @@ import java.util.Iterator; import java.util.Spliterator; import java.util.concurrent.ForkJoinTask; -import java.util.functions.Sink; +import java.util.streams.Sink; /** * ParallelOp @@ -42,7 +42,7 @@ public interface ParallelOpHelper<T, V> { public int suggestDepth(); public Spliterator<V> spliterator(); - public Sink<V> sink(Sink<? super T> sink); + public Sink<V, ?, ?> sink(Sink sink); public Iterator<T> iterator(); public<Z> Z invoke(ForkJoinTask<Z> task); }
--- a/src/share/classes/java/util/streams/ops/SeedlessFoldOp.java Thu Aug 30 11:42:29 2012 -0400 +++ b/src/share/classes/java/util/streams/ops/SeedlessFoldOp.java Thu Aug 30 19:40:53 2012 -0400 @@ -68,7 +68,7 @@ } @Override - public Optional<T> end() { + public Optional<T> getAndClearState() { Optional<T> result = first ? Optional.<T>empty() : new Optional<>(state); state = null; return result;
--- a/src/share/classes/java/util/streams/ops/SortedOp.java Thu Aug 30 11:42:29 2012 -0400 +++ b/src/share/classes/java/util/streams/ops/SortedOp.java Thu Aug 30 19:40:53 2012 -0400 @@ -25,8 +25,7 @@ package java.util.streams.ops; import java.util.*; -import java.util.functions.Sink; -import java.util.streams.StatefulSink; +import java.util.streams.Sink; import java.util.streams.Stream; @@ -64,20 +63,20 @@ } @Override - public StatefulSink<T, Void> sink(final Sink<? super T> sink) { - return new StatefulSink<T, Void>() { + public Sink<T, ?, ?> sink(Sink sink) { + return new Sink.ChainedLinear<T>(sink) { PriorityQueue<T> pq; @Override public void begin(int size) { pq = new PriorityQueue<>(size > 0 ? size : DEFAULT_PRIORITY_QUEUE_SIZE, comparator); + downstream.begin(size); } @Override - public Void end() { - while (!pq.isEmpty()) { - sink.accept(pq.remove()); - } - return null; + public void end() { + while (!pq.isEmpty()) + downstream.accept(pq.remove()); + downstream.end(); } @Override
--- a/src/share/classes/java/util/streams/ops/StatefulOp.java Thu Aug 30 11:42:29 2012 -0400 +++ b/src/share/classes/java/util/streams/ops/StatefulOp.java Thu Aug 30 19:40:53 2012 -0400 @@ -1,7 +1,6 @@ package java.util.streams.ops; import java.util.*; -import java.util.functions.*; import java.util.streams.*; /** @@ -15,7 +14,7 @@ public interface StatefulOp<T, U, V> extends IntermediateOp<T,U>, ParallelOp<T, TreeUtils.Node<U>> { @Override - public abstract StatefulSink<T, V> sink(Sink<? super U> sink); + public abstract Sink<T, ?, ?> sink(Sink sink); @Override public boolean isStateful() default { @@ -26,12 +25,11 @@ <Z> TreeUtils.Node<U> computeParallel(ParallelOpHelper<T, Z> helper) default { // dumb default serial implementation final StreamBuilder<U> sb = StreamBuilders.make(); - StatefulSink<T, V> sSink = sink(sb); - Sink<Z> sink = helper.sink(sSink); + Sink<Z, ?, ?> sink = helper.sink(sink(sb)); Spliterator<Z> spliterator = helper.spliterator(); - sSink.begin(spliterator.getRemainingSizeIfKnown()); + sink.begin(spliterator.getRemainingSizeIfKnown()); spliterator.forEach(sink); - sSink.end(); + sink.end(); return TreeUtils.node(sb); } }
--- a/src/share/classes/java/util/streams/ops/TerminalOp.java Thu Aug 30 11:42:29 2012 -0400 +++ b/src/share/classes/java/util/streams/ops/TerminalOp.java Thu Aug 30 19:40:53 2012 -0400 @@ -26,7 +26,7 @@ import java.util.Iterator; import java.util.Spliterator; -import java.util.functions.Sink; +import java.util.streams.Sink; import java.util.streams.StatefulSink; /** @@ -47,7 +47,8 @@ sink.begin(-1); while (iterator.hasNext()) sink.accept(iterator.next()); - return sink.end(); + sink.end(); + return sink.getAndClearState(); } public StatefulSink<T, U> sink(); @@ -57,15 +58,12 @@ @Override <V> U computeParallel(ParallelOpHelper<T, V> helper) default { // dumb default serial version - StatefulSink<T, U> sSink = sink(); - Sink<V> vSink = helper.sink(sSink); + StatefulSink<T, U> toArraySink = sink(); + Sink<V, ?, ?> sink = helper.sink(toArraySink); Spliterator<V> spliterator = helper.spliterator(); - sSink.begin(spliterator.getRemainingSizeIfKnown()); - // @@@ This is ridiculous! Integrate Sink/StatefulSink functionality in a less hacky way - if (sSink == vSink) - spliterator.forEach(vSink); - else - TreeUtils.forEach(spliterator, vSink); - return sSink.end(); + sink.begin(spliterator.getRemainingSizeIfKnown()); + spliterator.forEach(sink); + sink.end(); + return toArraySink.getAndClearState(); } }
--- a/src/share/classes/java/util/streams/ops/ToArrayOp.java Thu Aug 30 11:42:29 2012 -0400 +++ b/src/share/classes/java/util/streams/ops/ToArrayOp.java Thu Aug 30 19:40:53 2012 -0400 @@ -66,15 +66,11 @@ } @Override - public Object[] end() { - Object[] result; - if (count == elements.length) { - result = elements; - } else { - result = Arrays.copyOf(elements, count); - } + public Object[] getAndClearState() { + Object[] result = (count == elements.length) + ? elements + : Arrays.copyOf(elements, count); elements = null; - return result; } };
--- a/src/share/classes/java/util/streams/ops/TreeUtils.java Thu Aug 30 11:42:29 2012 -0400 +++ b/src/share/classes/java/util/streams/ops/TreeUtils.java Thu Aug 30 19:40:53 2012 -0400 @@ -27,7 +27,7 @@ import java.util.*; import java.util.concurrent.RecursiveAction; import java.util.concurrent.RecursiveTask; -import java.util.functions.Sink; +import java.util.functions.Block; import java.util.streams.*; /** @@ -40,21 +40,6 @@ throw new Error("no instances"); } - // @@@ This method is a hack, in that helper.sink() might serve up a stateful sink. - // This logic should be more tightly reflected in the helper rather than expecting every client to - // do instanceof and casts. - static<T> void forEach(Spliterator<T> spliterator, Sink<? super T> sink) { - if (sink instanceof StatefulSink) { - StatefulSink<T, ?> sSink = (StatefulSink<T, ?>) sink; - sSink.begin(spliterator.getRemainingSizeIfKnown()); - spliterator.forEach(sSink); - sSink.end(); - } - else { - spliterator.forEach(sink); - } - } - public static<T, U> Node<T> collect(ParallelOp.ParallelOpHelper<T, U> helper, boolean flattenLeaves, boolean flattenTree) { @@ -68,12 +53,18 @@ // Need to account for SIZED flag from pipeline if (size != -1 && splitSizesKnown) { builder = StreamBuilders.makeFixed(size); - forEach(spliterator, helper.sink(builder)); + Sink sink = helper.sink(builder); + sink.begin(spliterator.getRemainingSizeIfKnown()); + spliterator.forEach(sink); + sink.end(); return node((T[]) builder.toArray()); } else { builder = StreamBuilders.make(); - forEach(spliterator, helper.sink(builder)); + Sink sink = helper.sink(builder); + sink.begin(spliterator.getRemainingSizeIfKnown()); + spliterator.forEach(sink); + sink.end(); return node(builder); } } @@ -227,9 +218,9 @@ } @Override - public void forEach(Sink<? super T> sink) { + public void forEach(Block<? super T> block) { for (T t : data) - sink.accept(t); + block.apply(t); } @Override @@ -274,8 +265,8 @@ } @Override - public void forEach(Sink<? super T> sink) { - data.forEach(sink); + public void forEach(Block<? super T> block) { + data.forEach(block); } @Override @@ -343,9 +334,9 @@ } @Override - public void forEach(Sink<? super T> sink) { - left.forEach(sink); - right.forEach(sink); + public void forEach(Block<? super T> block) { + left.forEach(block); + right.forEach(block); } @Override @@ -408,14 +399,14 @@ } @Override - public void forEach(Sink<? super T> sink) { + public void forEach(Block<? super T> block) { if (iterator == null) { - cur.forEach(sink); + cur.forEach(block); iterator = Collections.emptyIterator(); } else { while (iterator.hasNext()) - sink.accept(iterator.next()); + block.apply(iterator.next()); } }
--- a/src/share/classes/java/util/streams/ops/UniqOp.java Thu Aug 30 11:42:29 2012 -0400 +++ b/src/share/classes/java/util/streams/ops/UniqOp.java Thu Aug 30 19:40:53 2012 -0400 @@ -25,8 +25,7 @@ package java.util.streams.ops; import java.util.*; -import java.util.functions.Sink; -import java.util.streams.StatefulSink; +import java.util.streams.Sink; import java.util.streams.Stream; /** @@ -54,25 +53,26 @@ } @Override - public StatefulSink<T, Void> sink(final Sink<? super T> sink) { - return new StatefulSink<T, Void>() { + public Sink<T, ?, ?> sink(Sink sink) { + return new Sink.ChainedLinear<T>(sink) { Set<T> seen; @Override public void begin(int size) { seen = new HashSet<>(); + downstream.begin(size); } @Override - public Void end() { + public void end() { seen = null; - return null; + downstream.end(); } @Override public void accept(T t) { if (seen.add(t)) { - sink.accept(t); + downstream.accept(t); } } };
--- a/test-ng/tests/org/openjdk/tests/java/util/IteratorsNullTest.java Thu Aug 30 11:42:29 2012 -0400 +++ b/test-ng/tests/org/openjdk/tests/java/util/IteratorsNullTest.java Thu Aug 30 19:40:53 2012 -0400 @@ -24,17 +24,16 @@ */ package org.openjdk.tests.java.util; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Factory; +import org.testng.annotations.Test; + import java.util.*; import java.util.functions.*; import java.util.streams.ops.FilterOp; import java.util.streams.ops.MapOp; import java.util.streams.ops.SortedOp; -import org.testng.annotations.DataProvider; -import org.testng.annotations.Factory; -import org.testng.annotations.Test; - - import static org.openjdk.tests.java.util.LambdaTestHelpers.*; /** @@ -43,7 +42,7 @@ @Test(groups = { "null", "lambda" }) public class IteratorsNullTest extends NullArgsTestCase { @Factory(dataProvider = "data") - public IteratorsNullTest(String name, Sink<Object[]> sink, Object[] args) { + public IteratorsNullTest(String name, Block<Object[]> sink, Object[] args) { super(name, sink, args); } @@ -52,64 +51,64 @@ public static Object[][] makeData() { List<Integer> list = countTo(10); return new Object[][] { - { "FilterOp.iterator", new Sink<Object[]>() { - public void accept(Object[] args) { + { "FilterOp.iterator", new Block<Object[]>() { + public void apply(Object[] args) { FilterOp.iterator((Iterator<Integer>) args[0], (Predicate<Integer>) args[1]); } }, new Object[] { list.iterator(), pTrue } }, - { "MapOp.iterator", new Sink<Object[]>() { - public void accept(Object[] args) { + { "MapOp.iterator", new Block<Object[]>() { + public void apply(Object[] args) { MapOp.iterator((Iterator<Integer>) args[0], (Mapper<Integer, Integer>) args[1]); } }, new Object[] { list.iterator(), mDoubler } }, - {"Iterators.reduce", new Sink<Object[]>() { - public void accept(Object[] args) { + {"Iterators.reduce", new Block<Object[]>() { + public void apply(Object[] args) { Iterators.<Integer>reduce((Iterator<Integer>) args[0], 0, (BinaryOperator<Integer>) args[1]); } }, new Object[]{list.iterator(), rPlus} }, - { "Iterators.mapReduce", new Sink<Object[]>() { - public void accept(Object[] args) { + { "Iterators.mapReduce", new Block<Object[]>() { + public void apply(Object[] args) { Iterators.<Integer, Integer>mapReduce((Iterator<Integer>) args[0], (Mapper<Integer, Integer>) args[1], 0, (BinaryOperator<Integer>) args[2]); } }, new Object[] { list.iterator(), mDoubler, rPlus } }, - { "Iterators.mapReduce(int)", new Sink<Object[]>() { - public void accept(Object[] args) { + { "Iterators.mapReduce(int)", new Block<Object[]>() { + public void apply(Object[] args) { Iterators.mapReduce((Iterator<Integer>) args[0], (IntMapper<Integer>) args[1], 0, (IntBinaryOperator) args[2]); } }, new Object[] { list.iterator(), imDoubler, irPlus } }, - { "Iterators.mapReduce(long)", new Sink<Object[]>() { - public void accept(Object[] args) { + { "Iterators.mapReduce(long)", new Block<Object[]>() { + public void apply(Object[] args) { Iterators.mapReduce((Iterator<Integer>) args[0], (LongMapper<Integer>) args[1], 0, (LongBinaryOperator) args[2]); } }, new Object[] { asLongs(list).iterator(), lmDoubler, lrPlus } }, - { "Iterators.mapReduce(double)", new Sink<Object[]>() { - public void accept(Object[] args) { + { "Iterators.mapReduce(double)", new Block<Object[]>() { + public void apply(Object[] args) { Iterators.mapReduce((Iterator<Integer>) args[0], (DoubleMapper<Integer>) args[1], 0, (DoubleBinaryOperator) args[2]); } }, new Object[] { asDoubles(list).iterator(), dmDoubler, drPlus } }, - { "SortedOp.iterator", new Sink<Object[]>() { - public void accept(Object[] args) { + { "SortedOp.iterator", new Block<Object[]>() { + public void apply(Object[] args) { SortedOp.iterator((Iterator<Integer>) args[0], Comparators.<Integer>naturalOrder()); } }, new Object[] { list.iterator() } }, - { "Iterators.sorted(Comparator)", new Sink<Object[]>() { - public void accept(Object[] args) { + { "Iterators.sorted(Comparator)", new Block<Object[]>() { + public void apply(Object[] args) { SortedOp.iterator((Iterator<Integer>) args[0], (Comparator) args[1]); } }, new Object[] { list.iterator(), cInteger }
--- a/test-ng/tests/org/openjdk/tests/java/util/LambdaTestHelpers.java Thu Aug 30 11:42:29 2012 -0400 +++ b/test-ng/tests/org/openjdk/tests/java/util/LambdaTestHelpers.java Thu Aug 30 19:40:53 2012 -0400 @@ -41,9 +41,9 @@ public static final Mapper<Integer, Integer> mZero = x -> 0; public static final Mapper<Integer, Integer> mId = x -> x; public static final Mapper<Integer, Integer> mDoubler = x -> x * 2; - public static final FlatMapper<Integer, Integer> mfId = /*@@@ Sink::accept*/ (s,e) -> { s.accept(e); }; + public static final FlatMapper<Integer, Integer> mfId = /*@@@ Sink::accept*/ (s,e) -> { s.apply(e); }; public static final FlatMapper<Integer, Integer> mfNull = (s, e) -> { }; - public static final FlatMapper<Integer, Integer> mfLt = (s, e) -> { for (int i=0; i<e; i++) s.accept(i); }; + public static final FlatMapper<Integer, Integer> mfLt = (s, e) -> { for (int i=0; i<e; i++) s.apply(i); }; public static final IntMapper<Integer> imDoubler = x -> x * 2; public static final LongMapper<Long> lmDoubler = x -> x * 2; public static final DoubleMapper<Double> dmDoubler = x -> x * 2; @@ -61,14 +61,14 @@ public static final FlatMapper<String, Character> flattenChars = new FlatMapper<String, Character>() { @Override - public void flatMapInto(Sink<? super Character> sink, String element) { + public void flatMapInto(Block<? super Character> sink, String element) { for (int i=0; i<element.length(); i++) { - sink.accept(element.charAt(i)); + sink.apply(element.charAt(i)); } } }; - public static List<Integer> nullStream() { + public static List<Integer> empty() { ArrayList<Integer> list = new ArrayList<>(); list.add(null); return list; @@ -199,6 +199,7 @@ assertTrue(!pI.hasNext()); } + @SafeVarargs public static<T> void assertContents(Iterator<T> pi, T... values) { assertContents(pi, Arrays.asList(values).iterator()); } @@ -247,35 +248,12 @@ } } - private static <T> Iterable<Iterable<T>> splitHelper(Spliterator<T> s, int depth, List<Iterable<T>> iterables) { - if (depth == 0) { - List<T> list = new ArrayList<>(); - while (s.hasNext()) { - list.add(s.next()); - } - iterables.add(list); - } - else { - splitHelper(s.split(), depth-1, iterables); - splitHelper(s, depth-1, iterables); - } - return iterables; - } - - public static <T> Sink<T> iteratorToSink(Iterator<? extends T> iterator, Sink<? super T> sink) { - while(iterator.hasNext()) { - sink.accept(iterator.next()); - } - - return (Sink<T>) sink; - } - public static <T,V> V iteratorToStatefulSink(Iterator<? extends T> iterator, StatefulSink<? super T, ? extends V> sink) { sink.begin(-1); while(iterator.hasNext()) { sink.accept(iterator.next()); } - - return sink.end(); + sink.end(); + return sink.getAndClearState(); } }
--- a/test-ng/tests/org/openjdk/tests/java/util/NullArgsTestCase.java Thu Aug 30 11:42:29 2012 -0400 +++ b/test-ng/tests/org/openjdk/tests/java/util/NullArgsTestCase.java Thu Aug 30 19:40:53 2012 -0400 @@ -24,10 +24,10 @@ */ package org.openjdk.tests.java.util; -import java.util.Arrays; import org.testng.annotations.Test; -import java.util.functions.Sink; +import java.util.Arrays; +import java.util.functions.Block; import static org.testng.Assert.fail; @@ -41,10 +41,10 @@ */ public abstract class NullArgsTestCase { public final String name; - public final Sink<Object[]> sink; + public final Block<Object[]> sink; public final Object[] args; - protected NullArgsTestCase(String name, Sink<Object[]> sink, Object[] args) { + protected NullArgsTestCase(String name, Block<Object[]> sink, Object[] args) { this.name = name; this.sink = sink; this.args = args; @@ -52,7 +52,7 @@ @Test public void goodNonNull() { - sink.accept(args); + sink.apply(args); } @Test @@ -61,7 +61,7 @@ Object[] temp = Arrays.copyOf(args, args.length); temp[i] = null; try { - sink.accept(temp); + sink.apply(temp); fail(String.format("Expected NullPointerException for argument %d of test case %s", i, name)); } catch (NullPointerException e) {
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/FlatMapOpTest.java Thu Aug 30 11:42:29 2012 -0400 +++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/FlatMapOpTest.java Thu Aug 30 19:40:53 2012 -0400 @@ -41,12 +41,12 @@ */ @Test public class FlatMapOpTest extends StreamOpTestCase { - private static final FlatMapper<Integer, Integer> mfIntToBits = (sink, integer) -> { + private static final FlatMapper<Integer, Integer> mfIntToBits = (block, integer) -> { int num = integer; for (int i = 0; i < 16 && num != 0; i++) { if ((num & (1 << i)) != 0) { num &= ~(1 << i); - sink.accept(i); + block.apply(i); } } };
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/StreamOpTestCase.java Thu Aug 30 11:42:29 2012 -0400 +++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/StreamOpTestCase.java Thu Aug 30 19:40:53 2012 -0400 @@ -32,7 +32,6 @@ import java.util.concurrent.ForkJoinUtils; import java.util.functions.BiPredicate; import java.util.functions.Block; -import java.util.functions.Sink; import java.util.streams.*; import java.util.streams.ops.IntermediateOp; import java.util.streams.ops.ParallelOp; @@ -47,7 +46,7 @@ @Test public abstract class StreamOpTestCase extends Assert { - protected static<T> void assertMatches(StreamResult<T> refResult, Block<Sink<T>> block) { + protected static<T> void assertMatches(StreamResult<T> refResult, Block<Sink> block) { StreamResult<T> newResult = new StreamResult<>(); block.apply(newResult); assertEquals(refResult, newResult); @@ -80,24 +79,19 @@ // Second pass -- create a sink and wrap it assertMatches(refResult, sink -> { - Sink<T> wrapped = sink(sink, ops); - if (wrapped instanceof StatefulSink) { - StatefulSink<T, ?> stateful = (StatefulSink<T, ?>)wrapped; - stateful.begin(-1); - data.forEach(stateful); - stateful.end(); - } else { - data.forEach(wrapped); - } + Sink<T, ?, ?> wrapped = sink(sink, ops); + wrapped.begin(-1); + data.forEach(wrapped); + wrapped.end(); }); // Third pass -- wrap with SequentialPipeline.op, and iterate in push mode - assertMatches(refResult, sink -> { stream(data.seq(ops)).forEach((Sink) sink); }); + assertMatches(refResult, (Sink sink) -> { stream(data.seq(ops)).forEach(sink); }); // Wrap as stream, and iterate in pull mode assertMatches(refResult, sink -> { for (Iterator<?> seqIter = data.seq(ops).iterator(); seqIter.hasNext(); ) - sink.accept((U) seqIter.next()); + sink.accept(seqIter.next()); }); // Wrap as stream, and iterate in mixed mode @@ -105,23 +99,24 @@ Stream<?> stream = stream(data.seq(ops)); Iterator<?> iter = stream.iterator(); if (iter.hasNext()) - sink.accept((U) iter.next()); - stream.forEach((Sink) sink); + sink.accept(iter.next()); + stream.forEach(sink); }); // Wrap as parallel stream + sequential - assertMatches(refResult, sink -> { stream(data.par(ops)).sequential().forEach((Sink) sink); }); + assertMatches(refResult, (Sink sink) -> { stream(data.par(ops)).sequential().forEach(sink); }); // Wrap as parallel stream + toArray assertMatches(refResult, sink -> { for (Object t : stream(data.par(ops)).toArray()) - sink.accept((U) t); + sink.accept(t); }); // Wrap as parallel stream + into assertMatches(refResult, sink -> { - for (Object u : stream(data.par(ops)).sequential().into(new ArrayList())) - sink.accept((U) u); + ArrayList list = stream(data.par(ops)).sequential().into(new ArrayList()); + for (Object u : list) + sink.accept(u); }); // More ways to iterate the PSS: iterate result of op @@ -153,13 +148,15 @@ StatefulSink<T, U> sink = terminalOp.sink(); sink.begin(-1); data.forEach(sink); - answer = sink.end(); + sink.end(); + answer = sink.getAndClearState(); // Create a sink and evaluate, with size advice StatefulSink<T, U> sink2 = terminalOp.sink(); sink2.begin(data.size()); data.forEach(sink2); - U answer2 = sink2.end(); + sink2.end(); + U answer2 = sink2.getAndClearState(); assertTrue(equalator.test(answer, answer2)); } @@ -191,7 +188,7 @@ } } - static class StreamResult<T> implements Sink<T>, Traversable<T>, Sized { + static class StreamResult<T> implements Sink.OfLinear<T>, Traversable<T>, Sized { private Object[] array; @@ -207,9 +204,9 @@ } @Override - public void forEach(Sink<? super T> sink) { + public void forEach(Block<? super T> block) { for (int i = 0; i < offset; i++) { - sink.accept((T) array[i]); + block.apply((T) array[i]); } } @@ -262,7 +259,7 @@ } @Override - public Sink sink(Sink sink) { + public Sink<T, ?, ?> sink(Sink sink) { return sink; } @@ -330,7 +327,7 @@ AbstractPipeline<?, T> par(); Iterator<T> iterator(); Spliterator<T> spliterator(); - void forEach(Sink<? super T> sink); + void forEach(Block<? super T> block); int size(); <U> AbstractPipeline<?, U> seq(IntermediateOp<T, U> op) default { @@ -417,9 +414,9 @@ } @Override - public void forEach(Sink<? super T> sink) { + public void forEach(Block<? super T> block) { for (T t : array) - sink.accept(t); + block.apply(t); } @Override @@ -465,8 +462,8 @@ } @Override - public void forEach(Sink<? super T> sink) { - collection.forEach(sink); + public void forEach(Block<? super T> block) { + collection.forEach(block); } @Override
--- a/test-ng/tests/org/openjdk/tests/javac/LambdaTranslationTest1.java Thu Aug 30 11:42:29 2012 -0400 +++ b/test-ng/tests/org/openjdk/tests/javac/LambdaTranslationTest1.java Thu Aug 30 19:40:53 2012 -0400 @@ -26,7 +26,7 @@ import org.testng.annotations.Test; -import java.util.functions.Sink; +import java.util.functions.Block; import static org.testng.Assert.assertEquals; @@ -69,63 +69,63 @@ } public void testLambdas() { - Sink<Object> b = t -> {setResult("Sink0::" + t);}; - b.accept("Howdy"); + Block<Object> b = t -> {setResult("Sink0::" + t);}; + b.apply("Howdy"); assertResult("Sink0::Howdy"); - Sink<String> b1 = t -> {setResult("Sink1::" + t);}; - b1.accept("Rowdy"); + Block<String> b1 = t -> {setResult("Sink1::" + t);}; + b1.apply("Rowdy"); assertResult("Sink1::Rowdy"); for (int i = 5; i < 10; ++i) { - Sink<Integer> b2 = t -> {setResult("Sink2::" + t);}; - b2.accept(i); + Block<Integer> b2 = t -> {setResult("Sink2::" + t);}; + b2.apply(i); assertResult("Sink2::" + i); } - Sink<Integer> b3 = t -> {setResult("Sink3::" + t);}; + Block<Integer> b3 = t -> {setResult("Sink3::" + t);}; for (int i = 900; i > 0; i -= 100) { - b3.accept(i); + b3.apply(i); assertResult("Sink3::" + i); } cntxt = "blah"; - Sink<String> b4 = t -> {setResult(String.format("b4: %s .. %s", cntxt, t));}; - b4.accept("Yor"); + Block<String> b4 = t -> {setResult(String.format("b4: %s .. %s", cntxt, t));}; + b4.apply("Yor"); assertResult("b4: blah .. Yor"); String flaw = "flaw"; - Sink<String> b5 = t -> {setResult(String.format("b5: %s .. %s", flaw, t));}; - b5.accept("BB"); + Block<String> b5 = t -> {setResult(String.format("b5: %s .. %s", flaw, t));}; + b5.apply("BB"); assertResult("b5: flaw .. BB"); cntxt = "flew"; - Sink<String> b6 = t -> {setResult(String.format("b6: %s .. %s .. %s", t, cntxt, flaw));}; - b6.accept("flee"); + Block<String> b6 = t -> {setResult(String.format("b6: %s .. %s .. %s", t, cntxt, flaw));}; + b6.apply("flee"); assertResult("b6: flee .. flew .. flaw"); - Sink<String> b7 = t -> {setResult(String.format("b7: %s %s", t, this.protectedSuperclassMethod()));}; - b7.accept("this:"); + Block<String> b7 = t -> {setResult(String.format("b7: %s %s", t, this.protectedSuperclassMethod()));}; + b7.apply("this:"); assertResult("b7: this: instance:flew"); - Sink<String> b8 = t -> {setResult(String.format("b8: %s %s", t, super.protectedSuperclassMethod()));}; - b8.accept("super:"); + Block<String> b8 = t -> {setResult(String.format("b8: %s %s", t, super.protectedSuperclassMethod()));}; + b8.apply("super:"); assertResult("b8: super: I'm the sub"); - Sink<String> b7b = t -> {setResult(String.format("b9: %s %s", t, protectedSuperclassMethod()));}; - b7b.accept("implicit this:"); + Block<String> b7b = t -> {setResult(String.format("b9: %s %s", t, protectedSuperclassMethod()));}; + b7b.apply("implicit this:"); assertResult("b9: implicit this: instance:flew"); - Sink<Object> b10 = t -> {setResult(String.format("b10: new LT1Thing: %s", (new LT1Thing(t)).str));}; - b10.accept("thing"); + Block<Object> b10 = t -> {setResult(String.format("b10: new LT1Thing: %s", (new LT1Thing(t)).str));}; + b10.apply("thing"); assertResult("b10: new LT1Thing: thing"); - Sink<Object> b11 = t -> {setResult(String.format("b11: %s", (new LT1Thing(t) { + Block<Object> b11 = t -> {setResult(String.format("b11: %s", (new LT1Thing(t) { String get() { return "*" + str.toString() + "*"; } }).get()));}; - b11.accept(999); + b11.apply(999); assertResult("b11: *999*"); } @@ -162,17 +162,17 @@ private int that = 1234; void doInner() { - Sink<String> i4 = t -> {setResult(String.format("i4: %d .. %s", that, t));}; - i4.accept("=1234"); + Block<String> i4 = t -> {setResult(String.format("i4: %d .. %s", that, t));}; + i4.apply("=1234"); assertResult("i4: 1234 .. =1234"); - Sink<String> i5 = t -> {setResult(""); appendResult(t); appendResult(t);}; - i5.accept("fruit"); + Block<String> i5 = t -> {setResult(""); appendResult(t); appendResult(t);}; + i5.apply("fruit"); assertResult("fruitfruit"); cntxt = "human"; - Sink<String> b4 = t -> {setResult(String.format("b4: %s .. %s", cntxt, t));}; - b4.accept("bin"); + Block<String> b4 = t -> {setResult(String.format("b4: %s .. %s", cntxt, t));}; + b4.apply("bin"); assertResult("b4: human .. bin"); final String flaw = "flaw"; @@ -182,17 +182,17 @@ System.out.printf("c5: %s\n", c5.call() ); **/ - Sink<String> b5 = t -> {setResult(String.format("b5: %s .. %s", flaw, t));}; - b5.accept("BB"); + Block<String> b5 = t -> {setResult(String.format("b5: %s .. %s", flaw, t));}; + b5.apply("BB"); assertResult("b5: flaw .. BB"); cntxt = "borg"; - Sink<String> b6 = t -> {setResult(String.format("b6: %s .. %s .. %s", t, cntxt, flaw));}; - b6.accept("flee"); + Block<String> b6 = t -> {setResult(String.format("b6: %s .. %s .. %s", t, cntxt, flaw));}; + b6.apply("flee"); assertResult("b6: flee .. borg .. flaw"); - Sink<String> b7b = t -> {setResult(String.format("b7b: %s %s", t, protectedSuperclassMethod()));}; - b7b.accept("implicit outer this"); + Block<String> b7b = t -> {setResult(String.format("b7b: %s %s", t, protectedSuperclassMethod()));}; + b7b.apply("implicit outer this"); assertResult("b7b: implicit outer this instance:borg"); /**
--- a/test-ng/tests/org/openjdk/tests/separate/SourceModel.java Thu Aug 30 11:42:29 2012 -0400 +++ b/test-ng/tests/org/openjdk/tests/separate/SourceModel.java Thu Aug 30 19:40:53 2012 -0400 @@ -234,8 +234,10 @@ .swap() .into(dependencies); // Do these last so that they override - this.typeDependencies.stream().mapped(x->x.getName()) - .swap().into(dependencies); + this.typeDependencies.stream() + .mapped(x->x.getName()) + .swap() + .into(dependencies); return dependencies.values(); } }