changeset 5880:76ff1fc6f813 it2-bootstrap

Combine various XxxSink classes into two; move Sink out of public API
author briangoetz
date Thu, 30 Aug 2012 19:40:53 -0400
parents 9dd0188c1229
children 794edf7559b2
files src/share/classes/java/lang/CharSequence.java src/share/classes/java/util/Arrays.java src/share/classes/java/util/Collection.java src/share/classes/java/util/Spliterator.java src/share/classes/java/util/Traversable.java src/share/classes/java/util/functions/BiBlock.java src/share/classes/java/util/functions/BiMapper.java src/share/classes/java/util/functions/BiPredicate.java src/share/classes/java/util/functions/BiSink.java src/share/classes/java/util/functions/FlatMapper.java src/share/classes/java/util/functions/Sink.java src/share/classes/java/util/streams/AbstractPipeline.java src/share/classes/java/util/streams/AbstractSequentialStreamAccessor.java src/share/classes/java/util/streams/LinearPipeline.java src/share/classes/java/util/streams/MapPipeline.java src/share/classes/java/util/streams/MapStream.java src/share/classes/java/util/streams/MapStreamAccessor.java src/share/classes/java/util/streams/Sink.java src/share/classes/java/util/streams/StatefulBiSink.java src/share/classes/java/util/streams/StatefulSink.java src/share/classes/java/util/streams/Stream.java src/share/classes/java/util/streams/StreamAccessor.java src/share/classes/java/util/streams/StreamBuilder.java src/share/classes/java/util/streams/StreamBuilders.java src/share/classes/java/util/streams/Streamable.java src/share/classes/java/util/streams/Streams.java src/share/classes/java/util/streams/ops/BiAllMatchOp.java src/share/classes/java/util/streams/ops/BiAnyMatchOp.java src/share/classes/java/util/streams/ops/BiFilterOp.java src/share/classes/java/util/streams/ops/BiMapOp.java src/share/classes/java/util/streams/ops/CumulateOp.java src/share/classes/java/util/streams/ops/FilterOp.java src/share/classes/java/util/streams/ops/FlatMapOp.java src/share/classes/java/util/streams/ops/FoldOp.java src/share/classes/java/util/streams/ops/ForEachOp.java src/share/classes/java/util/streams/ops/GroupByOp.java src/share/classes/java/util/streams/ops/IntermediateOp.java src/share/classes/java/util/streams/ops/MapExtractKeysOp.java src/share/classes/java/util/streams/ops/MapExtractValuesOp.java src/share/classes/java/util/streams/ops/MapFilterKeysOp.java src/share/classes/java/util/streams/ops/MapFilterValuesOp.java src/share/classes/java/util/streams/ops/MapMapValuesOp.java src/share/classes/java/util/streams/ops/MapOp.java src/share/classes/java/util/streams/ops/MapSortedOp.java src/share/classes/java/util/streams/ops/MapSwapOp.java src/share/classes/java/util/streams/ops/MappedOp.java src/share/classes/java/util/streams/ops/ParallelOp.java src/share/classes/java/util/streams/ops/SeedlessFoldOp.java src/share/classes/java/util/streams/ops/SortedOp.java src/share/classes/java/util/streams/ops/StatefulOp.java src/share/classes/java/util/streams/ops/TerminalOp.java src/share/classes/java/util/streams/ops/ToArrayOp.java src/share/classes/java/util/streams/ops/TreeUtils.java src/share/classes/java/util/streams/ops/UniqOp.java test-ng/tests/org/openjdk/tests/java/util/IteratorsNullTest.java test-ng/tests/org/openjdk/tests/java/util/LambdaTestHelpers.java test-ng/tests/org/openjdk/tests/java/util/NullArgsTestCase.java test-ng/tests/org/openjdk/tests/java/util/streams/ops/FlatMapOpTest.java test-ng/tests/org/openjdk/tests/java/util/streams/ops/StreamOpTestCase.java test-ng/tests/org/openjdk/tests/javac/LambdaTranslationTest1.java test-ng/tests/org/openjdk/tests/separate/SourceModel.java
diffstat 61 files changed, 563 insertions(+), 662 deletions(-) [+]
line wrap: on
line diff
--- a/src/share/classes/java/lang/CharSequence.java	Thu Aug 30 11:42:29 2012 -0400
+++ b/src/share/classes/java/lang/CharSequence.java	Thu Aug 30 19:40:53 2012 -0400
@@ -28,7 +28,7 @@
 import java.util.Iterator;
 import java.util.NoSuchElementException;
 import java.util.Traversable;
-import java.util.functions.Sink;
+import java.util.functions.Block;
 import java.util.streams.Stream;
 import java.util.streams.Streams;
 
@@ -152,9 +152,9 @@
     }
 
     @Override
-    public void forEach(Sink<? super Character> sink) {
+    public void forEach(Block<? super Character> block) {
         for (int i = 0; i < cs.length(); i++)
-            sink.accept(cs.charAt(i));
+            block.apply(cs.charAt(i));
     }
 
     public Iterator<Character> iterator() {
@@ -183,11 +183,11 @@
     }
 
     @Override
-    public void forEach(Sink<? super Integer> sink) {
+    public void forEach(Block<? super Integer> block) {
         for (int i = 0; i < cs.length(); ) {
             int cp = Character.codePointAt(cs, i);
             i += Character.charCount(cp);
-            sink.accept(cp);
+            block.apply(cp);
         }
     }
 
--- a/src/share/classes/java/util/Arrays.java	Thu Aug 30 11:42:29 2012 -0400
+++ b/src/share/classes/java/util/Arrays.java	Thu Aug 30 19:40:53 2012 -0400
@@ -3757,9 +3757,9 @@
         }
 
         @Override
-        public void forEach(Sink<? super T> sink) {
+        public void forEach(Block<? super T> block) {
             while (curIndex < endIndex) {
-                sink.accept(array[curIndex++]);
+                block.apply(array[curIndex++]);
             }
         }
 
@@ -3822,7 +3822,7 @@
         }
     }
 
-    private static class ArraySink<T> implements Sink<T> {
+    private static class ArraySink<T> implements Sink.OfLinear<T> {
         private final T[] array;
         private final int offset;
         private final int length;
@@ -3906,11 +3906,11 @@
         return asStreamable(array, 0, array.length);
     }
 
-    public static<T> Sink<T> sink(T[] array) {
+    public static<T> Sink<T, ?, ?> sink(T[] array) {
         return sink(array, 0, array.length);
     }
 
-    public static<T> Sink<T> sink(T[] array, int offset, int length) {
+    public static<T> Sink<T, ?, ?> sink(T[] array, int offset, int length) {
         return new ArraySink<>(array, offset, length);
     }
 
--- a/src/share/classes/java/util/Collection.java	Thu Aug 30 11:42:29 2012 -0400
+++ b/src/share/classes/java/util/Collection.java	Thu Aug 30 19:40:53 2012 -0400
@@ -25,8 +25,8 @@
 
 package java.util;
 
+import java.util.functions.Block;
 import java.util.functions.Predicate;
-import java.util.functions.Sink;
 import java.util.streams.Stream;
 import java.util.streams.Streamable;
 import java.util.streams.Streams;
@@ -495,9 +495,9 @@
     }
 
     @Override
-    void forEach(Sink<? super E> sink) default {
+    void forEach(Block<? super E> block) default {
         for (E e: this) {
-            sink.accept(e);
+            block.apply(e);
         }
     }
 
@@ -506,6 +506,6 @@
         if (stream.isParallel())
             stream = stream.sequential();
         // @@@ Would use this::add but compiler doens't support that yet.
-        stream.forEach((E e)-> { add(e); });
+        stream.forEach((E e) -> { add(e); });
     }
 }
--- a/src/share/classes/java/util/Spliterator.java	Thu Aug 30 11:42:29 2012 -0400
+++ b/src/share/classes/java/util/Spliterator.java	Thu Aug 30 19:40:53 2012 -0400
@@ -24,7 +24,7 @@
  */
 package java.util;
 
-import java.util.functions.Sink;
+import java.util.functions.Block;
 
 /**
  * Spliterator
@@ -47,7 +47,7 @@
     Spliterator<T> split();
 
     /** Process the remaining elements sequentially.  */
-    void forEach(Sink<? super T> sink);
+    void forEach(Block<? super T> sink);
 
     /**
      * Return the number of elements remaining to be processed, if the count can
--- a/src/share/classes/java/util/Traversable.java	Thu Aug 30 11:42:29 2012 -0400
+++ b/src/share/classes/java/util/Traversable.java	Thu Aug 30 19:40:53 2012 -0400
@@ -24,7 +24,7 @@
  */
 package java.util;
 
-import java.util.functions.Sink;
+import java.util.functions.Block;
 
 /**
  * Implementing this interface allows an object to indicate that it provides
@@ -38,10 +38,10 @@
     /**
      * Each element of the object will be provided to the specified Sink.
      *
-     * @param sink The Sink to which elements will be provided.
+     * @param block The Sink to which elements will be provided.
      */
-    public void forEach(Sink<? super T> sink) default {
+    public void forEach(Block<? super T> block) default {
         for (T t : this)
-            sink.accept(t);
+            block.apply(t);
     }
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/share/classes/java/util/functions/BiBlock.java	Thu Aug 30 19:40:53 2012 -0400
@@ -0,0 +1,68 @@
+/*
+ * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package java.util.functions;
+
+/**
+ * BiBlock
+ *
+ * @author Brian Goetz
+ */
+/**
+ * Performs operations upon an input object which may modify that object and/or
+ * external state (other objects).
+ *
+ * <p>All block implementations are expected to:
+ * <ul>
+ * <li>When used for aggregate operations upon many elements blocks
+ * should not assume that the {@code apply} operation will be called upon
+ * elements in any specific order.</li>
+ * </ul>
+ *
+ * @param <L> the type of input objects provided to {@code apply}.
+ * @param <R> the type of input objects provided to {@code apply}.
+ */
+public interface BiBlock<L, R> {
+
+    /**
+     * Performs operations upon the provided object which may modify that object
+     * and/or external state.
+     *
+     * @param l an input object
+     */
+    void apply(L l, R r);
+
+    /**
+     * Returns a Block which performs in sequence the {@code apply} methods of
+     * multiple Blocks. This Block's {@code apply} method is performed followed
+     * by the {@code apply} method of the specified Block operation.
+     *
+     * @param other an additional Block which will be chained after this Block
+     * @return a Block which performs in sequence the {@code apply} method of
+     * this Block and the {@code apply} method of the specified Block operation
+     */
+    public BiBlock<L, R> chain(BiBlock<? super L, ? super R> other) default {
+        return (l, r) -> { apply(l, r); other.apply(l, r); };
+    }
+}
--- a/src/share/classes/java/util/functions/BiMapper.java	Thu Aug 30 11:42:29 2012 -0400
+++ b/src/share/classes/java/util/functions/BiMapper.java	Thu Aug 30 19:40:53 2012 -0400
@@ -55,10 +55,10 @@
  *
  * @param <T> the type of input objects provided to the {@code map} operation.
  * @param <U> the type of input objects provided to the {@code map} operation.
- * @param <V> the type of output objects from {@code map} operation. May be the
+ * @param <R> the type of output objects from {@code map} operation. May be the
  * same type as either {@code <T>} or {@code <U>}.
  */
-public interface BiMapper<T, U, V> {
+public interface BiMapper<T, U, R> {
 
     /**
      * Map the provided input objects to an appropriate output object.
@@ -67,20 +67,20 @@
      * @param u another object to be mapped.
      * @return the mapped output object.
      */
-    V map(T t, U u);
+    R map(T t, U u);
 
     /**
      * Combine with another mapper producing a mapper which preforms both
      * mappings.
      *
-     * @param <V> Type of output objects from the combined mapper. May be the
+     * @param <W> Type of output objects from the combined mapper. May be the
      * same type as {@code <U>}.
      * @param after An additional mapping to be applied to the result of this
      * mapping.
      * @return A mapper which performs both the original mapping followed by
      * a second mapping.
      */
-    public <W> BiMapper<T, U, W> compose(Mapper<? super V, ? extends W> after) default {
+    public <W> BiMapper<T, U, W> compose(Mapper<? super R, ? extends W> after) default {
         throw new UnsupportedOperationException("Not yet implemented.");
     }
 }
--- a/src/share/classes/java/util/functions/BiPredicate.java	Thu Aug 30 11:42:29 2012 -0400
+++ b/src/share/classes/java/util/functions/BiPredicate.java	Thu Aug 30 19:40:53 2012 -0400
@@ -52,8 +52,8 @@
  * elements in any specific order.</li>
  * </ul>
  *
- * @param <L> the type of input objects provided to {@code eval}.
- * @param <R> the type of input objects provided to {@code eval}.
+ * @param <L> the type of input objects provided to {@code test}.
+ * @param <R> the type of input objects provided to {@code test}.
  */
 public interface BiPredicate<L,R> {
 
--- a/src/share/classes/java/util/functions/BiSink.java	Thu Aug 30 11:42:29 2012 -0400
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,70 +0,0 @@
-/*
- * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.  Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-package java.util.functions;
-
-import java.util.Mapping;
-import java.util.streams.StatefulBiSink;
-import java.util.streams.StatefulSink;
-
-/**
- * BiSink
- *
- * @author Brian Goetz
- */
-public interface BiSink<T, U> extends Sink<Mapping<T,U>> {
-    void accept(T t, U u);
-
-    void accept(Mapping<T,U> mapping) default {
-        accept(mapping.getKey(), mapping.getValue());
-    }
-
-    BiSink<T,U> tee(BiSink<? super T, ? super U> other) default {
-        return (T t, U u) -> { other.accept(t, u); accept(t, u); };
-    }
-
-    @Override
-    StatefulBiSink<T,U,Void> asStatefulSink() default {
-        if (this instanceof StatefulSink)
-            return (StatefulBiSink<T, U, Void>) this;
-        else {
-            // @@@ Compiler bug workaround - have to capture a fake outer 'this'
-            final BiSink<T,U> thiz = this;
-            return new StatefulBiSink<T, U, Void>() {
-                @Override
-                public void begin(int size) { }
-
-                @Override
-                public Void end() {
-                    return null;
-                }
-
-                @Override
-                public void accept(T t, U u) {
-                    thiz.accept(t, u);
-                }
-            };
-        }
-    }
-}
--- a/src/share/classes/java/util/functions/FlatMapper.java	Thu Aug 30 11:42:29 2012 -0400
+++ b/src/share/classes/java/util/functions/FlatMapper.java	Thu Aug 30 19:40:53 2012 -0400
@@ -36,5 +36,5 @@
      * @param sink Destination for mapped elements.
      * @param element The element to be mapped.
      */
-    void flatMapInto(Sink<? super R> sink, T element);
+    void flatMapInto(Block<? super R> sink, T element);
 }
--- a/src/share/classes/java/util/functions/Sink.java	Thu Aug 30 11:42:29 2012 -0400
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,70 +0,0 @@
-/*
- * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.  Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-package java.util.functions;
-
-import java.util.streams.StatefulSink;
-
-/**
- * A receiver of elements. The counterpart to {@code Stream}.
- *
- * @param <T> The type of elements accepted by this Sink.
- *
- * @author Brian Goetz
- */
-public interface Sink<T> {
-    /**
-     * Accept an element.
-     *
-     * @param t The element to be accepted.
-     */
-    void accept(T t);
-
-    public Sink<T> tee(Sink<? super T> other) default {
-        return (T t) -> { other.accept(t); accept(t); };
-    }
-
-    public StatefulSink<T, Void> asStatefulSink() default {
-        if (this instanceof StatefulSink)
-            return (StatefulSink<T, Void>) this;
-        else {
-            // @@@ Compiler bug workaround - have to capture a fake outer 'this'
-            final Sink<T> thiz = this;
-            return new StatefulSink<T, Void>() {
-                @Override
-                public void begin(int size) { }
-
-                @Override
-                public Void end() {
-                    return null;
-                }
-
-                @Override
-                public void accept(T t) {
-                    thiz.accept(t);
-                }
-            };
-        }
-    }
-}
--- a/src/share/classes/java/util/streams/AbstractPipeline.java	Thu Aug 30 11:42:29 2012 -0400
+++ b/src/share/classes/java/util/streams/AbstractPipeline.java	Thu Aug 30 19:40:53 2012 -0400
@@ -27,7 +27,6 @@
 import java.util.*;
 import java.util.concurrent.ForkJoinTask;
 import java.util.concurrent.ForkJoinUtils;
-import java.util.functions.Sink;
 import java.util.streams.ops.*;
 
 /**
@@ -149,27 +148,19 @@
             sink.begin(-1); // @@@ supply size if known (iterator was null, chain is size-preserving, initial size known)
             while (chain.hasNext())
                 sink.accept(chain.next());
-            return sink.end();
+            sink.end();
+            return sink.getAndClearState();
         }
         else {
             // Push traversal
-            int nStateful = 0;
-            StatefulSink[] statefulSinks = new StatefulSink[ops.length];
             StatefulSink<U, V> terminalSink = terminal.sink();
             Sink sink = terminalSink;
-            terminalSink.begin(-1); // @@@ supply size if known to terminal stage
-            for (int i=ops.length-1; i >= 0; i--) {
+            for (int i=ops.length-1; i >= 0; i--)
                 sink = ops[i].sink(sink);
-                if (ops[i].isStateful()) {
-                    StatefulSink s = (StatefulSink) sink;
-                    statefulSinks[nStateful++] = s;
-                    s.begin(-1);  // @@@ supply size if known to *this* stage
-                }
-            }
+            sink.begin(-1); // @@@ supply size if known
             source.forEach(sink);
-            for (int i = nStateful-1; i >= 0; i--)
-                statefulSinks[i].end();
-            return terminalSink.end();
+            sink.end();
+            return terminalSink.getAndClearState();
         }
     }
 
@@ -190,14 +181,14 @@
             }
 
             @Override
-            public Sink<T> sink(Sink<? super U> sink) {
-                Sink<?> chain = sink;
+            public Sink<T, ?, ?> sink(Sink sink) {
+                Sink chain = sink;
                 for (int i = ops.length-1; i >= 0; i--) {
                     IntermediateOp op = ops[i];
                     chain = op.sink(chain);
                 }
 
-                return (Sink<T>) chain;
+                return chain;
             }
 
             @Override
--- a/src/share/classes/java/util/streams/AbstractSequentialStreamAccessor.java	Thu Aug 30 11:42:29 2012 -0400
+++ b/src/share/classes/java/util/streams/AbstractSequentialStreamAccessor.java	Thu Aug 30 19:40:53 2012 -0400
@@ -24,9 +24,7 @@
  */
 package java.util.streams;
 
-import java.util.Iterator;
 import java.util.Spliterator;
-import java.util.functions.Sink;
 
 /**
  * AbstractSequentialStreamAccessor
--- a/src/share/classes/java/util/streams/LinearPipeline.java	Thu Aug 30 11:42:29 2012 -0400
+++ b/src/share/classes/java/util/streams/LinearPipeline.java	Thu Aug 30 19:40:53 2012 -0400
@@ -90,8 +90,8 @@
     }
 
     @Override
-    public void forEach(Sink<? super U> sink) {
-        pipeline(new ForEachOp<>(sink));
+    public void forEach(Block<? super U> sink) {
+        pipeline(ForEachOp.make(sink));
     }
 
     @Override
--- a/src/share/classes/java/util/streams/MapPipeline.java	Thu Aug 30 11:42:29 2012 -0400
+++ b/src/share/classes/java/util/streams/MapPipeline.java	Thu Aug 30 19:40:53 2012 -0400
@@ -68,8 +68,8 @@
     }
 
     @Override
-    public void forEach(BiSink<K, V> sink) {
-        pipeline(new ForEachOp<>(sink));
+    public void forEach(BiBlock<K, V> sink) {
+        pipeline(ForEachOp.make(sink));
     }
 
     public<U> U pipeline(TerminalOp<Mapping<K,V>, U> op) {
--- a/src/share/classes/java/util/streams/MapStream.java	Thu Aug 30 11:42:29 2012 -0400
+++ b/src/share/classes/java/util/streams/MapStream.java	Thu Aug 30 19:40:53 2012 -0400
@@ -119,7 +119,7 @@
      *
      * @param sink the Sink via which all elements will be processed.
      */
-    void forEach(BiSink<K, V> sink);
+    void forEach(BiBlock<K, V> sink);
 
     // @@@ Map, or MapFillable?
     <A extends Map<K, V>> A into(A target);
--- a/src/share/classes/java/util/streams/MapStreamAccessor.java	Thu Aug 30 11:42:29 2012 -0400
+++ b/src/share/classes/java/util/streams/MapStreamAccessor.java	Thu Aug 30 19:40:53 2012 -0400
@@ -26,7 +26,7 @@
 
 import java.util.MapIterator;
 import java.util.Mapping;
-import java.util.functions.BiSink;
+import java.util.functions.BiBlock;
 
 /**
  * MapStreamAccessor
@@ -34,7 +34,7 @@
  * @author Brian Goetz
  */
 public interface MapStreamAccessor<K, V> extends StreamAccessor<Mapping<K,V>>, MapIterator<K,V> {
-    void forEach(BiSink<? super K, ? super V> sink);
+    void forEach(BiBlock<? super K, ? super V> sink);
 
     Stream.Shape getShape() default { return Stream.Shape.KEY_VALUE; }
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/share/classes/java/util/streams/Sink.java	Thu Aug 30 19:40:53 2012 -0400
@@ -0,0 +1,117 @@
+/*
+ * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package java.util.streams;
+
+import java.util.Mapping;
+import java.util.functions.Block;
+
+/**
+ * InternalSink
+ *
+ * @author Brian Goetz
+ */
+public interface Sink<T, K, V> extends Block<T> {
+    void accept(T t);
+
+    void accept(K k, V v) default { throw new IllegalStateException(); }
+
+    /**
+     * Reset the sink state to receive a fresh data set.  This is used when a
+     * Sink is being reused by multiple calculations.
+     * @param size The approximate size of the data to be pushed downstream, if
+     * known or -1 if size is unknown.
+     */
+    void begin(int size);
+
+    /**
+     * Indicate that all element that is going to be pushed has been pushed.
+     * Chained sinks should dump their contents downstream and clear any stored
+     * state; terminal sinks should return their terminal state and clear any
+     * stored state.
+     *
+     * @return The terminal state of the sink.
+     */
+    void end();
+
+    @Override
+    void apply(T t) default { accept(t); }
+
+
+    public interface OfLinear<T> extends Sink<T, Object, Object> {
+        @Override
+        void begin(int size) default { }
+
+        @Override
+        void end() default { }
+    }
+
+    public static abstract class ChainedLinear<T> implements OfLinear<T> {
+        protected final Sink downstream;
+
+        public ChainedLinear(Sink downstream) {
+            this.downstream = downstream;
+        }
+
+        @Override
+        public void begin(int size) {
+            downstream.begin(size);
+        }
+
+        @Override
+        public void end() {
+            downstream.end();
+        }
+    }
+
+    public interface OfMap<K, V> extends Sink<Mapping<K, V>, K, V> {
+        @Override
+        void accept(Mapping<K, V> mapping) default {
+            // @@@ Temporary sanity check -- probably should throw ISE here
+            System.out.println("Someone called accept(Mapping)");
+            accept(mapping.getKey(), mapping.getValue());
+        }
+
+        @Override
+        void accept(K k, V v);
+    }
+
+    public static abstract class ChainedMap<K,V> implements OfMap<K,V> {
+        protected final Sink downstream;
+
+        public ChainedMap(Sink downstream) {
+            this.downstream = downstream;
+        }
+
+        @Override
+        public void begin(int size) {
+            downstream.begin(size);
+        }
+
+        @Override
+        public void end() {
+            downstream.end();
+        }
+    }
+}
--- a/src/share/classes/java/util/streams/StatefulBiSink.java	Thu Aug 30 11:42:29 2012 -0400
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,36 +0,0 @@
-/*
- * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.  Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-package java.util.streams;
-
-import java.util.Mapping;
-import java.util.functions.BiSink;
-
-/**
- * StatefulBiSink
- *
- * @author Brian Goetz
- */
-public interface StatefulBiSink<K,V,R> extends BiSink<K,V>, StatefulSink<Mapping<K,V>,R> {
-}
--- a/src/share/classes/java/util/streams/StatefulSink.java	Thu Aug 30 11:42:29 2012 -0400
+++ b/src/share/classes/java/util/streams/StatefulSink.java	Thu Aug 30 19:40:53 2012 -0400
@@ -24,32 +24,23 @@
  */
 package java.util.streams;
 
-import java.util.functions.Sink;
-
 /**
- * A Sink which accumulates state as elements are accepted.
+ * A KitchenSink which accumulates state as elements are accepted.
  *
  * @param <T> The type of elements to be accepted.
- * @param <V> The type of the terminal state.
+ * @param <R> The type of the terminal state.
  *
  * @author Brian Goetz
  */
-public interface StatefulSink<T, V> extends Sink<T> {
+public interface StatefulSink<T, R> extends Sink<T, Object, Object> {
     /**
-     * Reset the sink state to receive a fresh data set.  This is used when a
-     * Sink is being reused by multiple calculations.
-     * @param size The approximate size of the data to be pushed downstream, if
-     * known or -1 if size is unknown.
+     * Retrieve, and clear, the current state of the sink.
      */
-    void begin(int size);
+    R getAndClearState();
 
-    /**
-     * Indicate that all element that is going to be pushed has been pushed.
-     * Chained sinks should dump their contents downstream and clear any stored
-     * state; terminal sinks should return their terminal state and clear any
-     * stored state.
-     *
-     * @return The terminal state of the sink. 
-     */
-    V end();
+    @Override
+    void begin(int size) default { }
+
+    @Override
+    void end() default { }
 }
--- a/src/share/classes/java/util/streams/Stream.java	Thu Aug 30 11:42:29 2012 -0400
+++ b/src/share/classes/java/util/streams/Stream.java	Thu Aug 30 19:40:53 2012 -0400
@@ -99,7 +99,7 @@
 
     Stream<T> cumulate(BinaryOperator<T> operator);
 
-    void forEach(Sink<? super T> sink);
+    void forEach(Block<? super T> sink);
 
     <A extends Fillable<? super T>> A into(A target);
 
--- a/src/share/classes/java/util/streams/StreamAccessor.java	Thu Aug 30 11:42:29 2012 -0400
+++ b/src/share/classes/java/util/streams/StreamAccessor.java	Thu Aug 30 19:40:53 2012 -0400
@@ -26,7 +26,7 @@
 
 import java.util.Iterator;
 import java.util.Spliterator;
-import java.util.functions.Sink;
+import java.util.functions.Block;
 
 /**
  * StreamAccessor
@@ -36,7 +36,7 @@
  * @author Brian Goetz
  */
 public interface StreamAccessor<T> {
-    void forEach(Sink<? super T> sink);
+    void forEach(Block<? super T> sink);
 
     Iterator<T> iterator();
 
--- a/src/share/classes/java/util/streams/StreamBuilder.java	Thu Aug 30 11:42:29 2012 -0400
+++ b/src/share/classes/java/util/streams/StreamBuilder.java	Thu Aug 30 19:40:53 2012 -0400
@@ -25,17 +25,17 @@
 package java.util.streams;
 
 import java.util.*;
-import java.util.functions.Sink;
+import java.util.functions.Block;
 
 /**
  * StreamBuilder
  *
  * @author Brian Goetz
  */
-public interface StreamBuilder<T> extends Sized, Streamable<T>, Traversable<T>, Sink<T> /* , Fillable<T> */ {
+public interface StreamBuilder<T> extends Sized, Streamable<T>, Traversable<T>, Sink.OfLinear<T> /* , Fillable<T> */ {
 
     @Override
-    void forEach(Sink<? super T> sink);
+    void forEach(Block<? super T> block);
 
     void clear();
 
--- a/src/share/classes/java/util/streams/StreamBuilders.java	Thu Aug 30 11:42:29 2012 -0400
+++ b/src/share/classes/java/util/streams/StreamBuilders.java	Thu Aug 30 19:40:53 2012 -0400
@@ -28,7 +28,7 @@
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
-import java.util.functions.Sink;
+import java.util.functions.Block;
 
 /**
  * Utilities for building streams.
@@ -91,9 +91,9 @@
         }
 
         @Override
-        public void forEach(Sink<? super T> sink) {
+        public void forEach(Block<? super T> block) {
             for (int i=0; i<curSize; i++) {
-                sink.accept(array[i]);
+                block.apply(array[i]);
             }
         }
 
@@ -174,8 +174,8 @@
         }
 
         @Override
-        public void forEach(Sink<? super T> sink) {
-            list.forEach(sink);
+        public void forEach(Block<? super T> block) {
+            list.forEach(block);
         }
 
         @Override
--- a/src/share/classes/java/util/streams/Streamable.java	Thu Aug 30 11:42:29 2012 -0400
+++ b/src/share/classes/java/util/streams/Streamable.java	Thu Aug 30 19:40:53 2012 -0400
@@ -24,10 +24,6 @@
  */
 package java.util.streams;
 
-import java.util.*;
-import java.util.functions.*;
-import java.util.streams.ops.*;
-
 /**
  * Streamable
  *
--- a/src/share/classes/java/util/streams/Streams.java	Thu Aug 30 11:42:29 2012 -0400
+++ b/src/share/classes/java/util/streams/Streams.java	Thu Aug 30 19:40:53 2012 -0400
@@ -25,7 +25,7 @@
 package java.util.streams;
 
 import java.util.*;
-import java.util.functions.Sink;
+import java.util.functions.Block;
 
 /**
  * Streams
@@ -83,7 +83,7 @@
             }
 
             @Override
-            public void forEach(Sink<? super T> sink) { }
+            public void forEach(Block<? super T> block) { }
 
             @Override
             public int getRemainingSizeIfKnown() {
@@ -124,9 +124,9 @@
         }
 
         @Override
-        public void forEach(Sink<? super T> sink) {
+        public void forEach(Block<? super T> block) {
             while (it.hasNext())
-                sink.accept(it.next());
+                block.apply(it.next());
         }
 
         @Override
@@ -173,8 +173,8 @@
         }
 
         @Override
-        public void forEach(Sink<? super T> sink) {
-            spliterator.forEach(sink);
+        public void forEach(Block<? super T> block) {
+            spliterator.forEach(block);
         }
 
         @Override
@@ -240,14 +240,14 @@
         }
 
         @Override
-        public void forEach(Sink<? super T> sink) {
+        public void forEach(Block<? super T> block) {
             if (iterator == null) {
-                traversable.forEach(sink);
+                traversable.forEach(block);
                 iterator = Collections.emptyIterator();
             }
             else {
                 while (iterator.hasNext())
-                    sink.accept(iterator.next());
+                    block.apply(iterator.next());
             }
         }
 
--- a/src/share/classes/java/util/streams/ops/BiAllMatchOp.java	Thu Aug 30 11:42:29 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/BiAllMatchOp.java	Thu Aug 30 19:40:53 2012 -0400
@@ -47,7 +47,7 @@
 
     @Override
     public Boolean evaluate(Iterator<Mapping<K,V>> iterator) {
-        return allMatch(new MapIterator.IteratorAdapter(iterator), predicate);
+        return allMatch(new MapIterator.IteratorAdapter<>(iterator), predicate);
     }
 
     public static <K,V> boolean allMatch(MapIterator<K,V> iterator, BiPredicate<? super K, ? super V> predicate) {
--- a/src/share/classes/java/util/streams/ops/BiAnyMatchOp.java	Thu Aug 30 11:42:29 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/BiAnyMatchOp.java	Thu Aug 30 19:40:53 2012 -0400
@@ -47,7 +47,7 @@
 
     @Override
     public Boolean evaluate(Iterator<Mapping<K,V>> iterator) {
-        return anyMatch(new MapIterator.IteratorAdapter(iterator), predicate);
+        return anyMatch(new MapIterator.IteratorAdapter<>(iterator), predicate);
     }
 
     public static <K,V> boolean anyMatch(MapIterator<K,V> iterator, BiPredicate<? super K, ? super V> predicate) {
--- a/src/share/classes/java/util/streams/ops/BiFilterOp.java	Thu Aug 30 11:42:29 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/BiFilterOp.java	Thu Aug 30 19:40:53 2012 -0400
@@ -24,14 +24,9 @@
  */
 package java.util.streams.ops;
 
-import java.util.Iterator;
-import java.util.MapIterator;
-import java.util.Mapping;
-import java.util.NoSuchElementException;
-import java.util.Objects;
+import java.util.*;
 import java.util.functions.BiPredicate;
-import java.util.functions.BiSink;
-import java.util.functions.Sink;
+import java.util.streams.Sink;
 import java.util.streams.Stream;
 
 /**
@@ -62,25 +57,14 @@
     }
 
     @Override
-    public Sink<Mapping<K,V>> sink(final Sink<? super Mapping<K,V>> sink) {
-        if (!(sink instanceof BiSink)) {
-            throw new IllegalArgumentException(String.format("Expected argument sink to be of type BiSync; found %s", sink.getClass()));
-        }
-        final BiSink<K, V> biSink = (BiSink<K, V>) (BiSink) Objects.requireNonNull(sink);
-
-        return new BiSink<K,V>() {
+    public Sink<Mapping<K, V>, K, V> sink(Sink sink) {
+        return new Sink.ChainedMap<K, V>(sink) {
             @Override
             public void accept(K k, V v) {
                 if(predicate.test(k, v)) {
-                    biSink.accept(k, v);
+                    downstream.accept(k, v);
                 }
             }
-
-            @Override
-            public void accept(Mapping<K, V> entry) {
-                System.out.println("Warning: executing push(Entry)");
-                accept(entry.getKey(), entry.getValue());
-            }
         };
     }
 
--- a/src/share/classes/java/util/streams/ops/BiMapOp.java	Thu Aug 30 11:42:29 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/BiMapOp.java	Thu Aug 30 19:40:53 2012 -0400
@@ -29,8 +29,7 @@
 import java.util.Mapping;
 import java.util.Objects;
 import java.util.functions.BiMapper;
-import java.util.functions.BiSink;
-import java.util.functions.Sink;
+import java.util.streams.Sink;
 import java.util.streams.Stream;
 
 /**
@@ -67,27 +66,17 @@
     }
 
     @Override
-    public BiSink<K,V> sink(final Sink<? super Mapping<K,U>> sink) {
-        if (!(sink instanceof BiSink)) {
-            throw new IllegalStateException("Expecting BiSink");
-        }
-        final BiSink<K, U> biSink = (BiSink<K, U>) (Sink) Objects.requireNonNull(sink);
-
-        return new BiSink<K,V>() {
+    public Sink<Mapping<K, V>, K, V> sink(Sink sink) {
+        return new Sink.ChainedMap<K,V>(sink) {
             @Override
             public void accept(K k, V v) {
-                biSink.accept(k, mapper.map(k, v));
-            }
-
-            @Override
-            public void accept(Mapping<K, V> entry) {
-                System.out.println("Warning: executing push(Entry)");
-                accept(entry.getKey(), entry.getValue());
+                downstream.accept(k, mapper.map(k, v));
             }
         };
     }
 
-    public static <K, V, VV> MapIterator<K, VV> iterator(Iterator<Mapping<K, V>> source, BiMapper<? super K, ? super V, ? extends VV> mapper) {
+    public static <K, V, VV> MapIterator<K, VV> iterator(final Iterator<Mapping<K, V>> source,
+                                                         final BiMapper<? super K, ? super V, ? extends VV> mapper) {
         Objects.requireNonNull(source);
         Objects.requireNonNull(mapper);
         return new MapIterator<K, VV>() {
@@ -100,7 +89,7 @@
 
             @Override
             public Mapping<K, VV> next() {
-                Mapping<K, V> next = source.next();
+                final Mapping<K, V> next = source.next();
                 current = new Mapping.AbstractMapping<K,VV>() {
                     @Override
                     public K getKey() {
--- a/src/share/classes/java/util/streams/ops/CumulateOp.java	Thu Aug 30 11:42:29 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/CumulateOp.java	Thu Aug 30 19:40:53 2012 -0400
@@ -24,11 +24,11 @@
  */
 package java.util.streams.ops;
 
-import java.util.*;
-import java.util.concurrent.ForkJoinTask;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.Spliterator;
 import java.util.concurrent.RecursiveTask;
 import java.util.functions.BinaryOperator;
-import java.util.functions.Sink;
 import java.util.streams.*;
 
 /**
@@ -51,19 +51,15 @@
     }
 
     @Override
-    public StatefulSink<T, T> sink(final Sink<? super T> sink) {
+    public StatefulSink<T, T> sink(final Sink sink) {
         return new StatefulSink<T, T>() {
-            private boolean first = true;
+            private boolean first;
             private T state;
 
             @Override
             public void begin(int size) {
                 first = true;
-            }
-
-            @Override
-            public T end() {
-                return state;
+                sink.begin(size);
             }
 
             @Override
@@ -77,6 +73,13 @@
                 }
                 sink.accept(state);
             }
+
+            @Override
+            public T getAndClearState() {
+                T ret = state;
+                state = null;
+                return ret;
+            }
         };
     }
 
@@ -176,10 +179,12 @@
                     }
                     else {
                         leafData = StreamBuilders.make();
-                        StatefulSink<T, T> sink = sink(leafData);
+                        StatefulSink<T, T> terminalSink = sink(leafData);
+                        Sink<V, ?, ?> sink = problem.helper.sink(terminalSink);
                         sink.begin(-1);
-                        source.forEach(problem.helper.sink(sink));
-                        upward = sink.end();
+                        source.forEach(sink);
+                        sink.end();
+                        upward = terminalSink.getAndClearState();
                         // Special case -- if problem.depth == 0, just wrap the result and be done
                         if (isRoot())
                             return TreeUtils.node(leafData);
--- a/src/share/classes/java/util/streams/ops/FilterOp.java	Thu Aug 30 11:42:29 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/FilterOp.java	Thu Aug 30 19:40:53 2012 -0400
@@ -28,7 +28,7 @@
 import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.functions.Predicate;
-import java.util.functions.Sink;
+import java.util.streams.Sink;
 import java.util.streams.Stream;
 
 /**
@@ -56,13 +56,13 @@
     }
 
     @Override
-    public Sink<T> sink(final Sink<? super T> sink) {
+    public Sink<T, ?, ?> sink(final Sink sink) {
         Objects.requireNonNull(sink);
-        return new Sink<T>() {
+        return new Sink.ChainedLinear<T>(sink) {
             @Override
             public void accept(T t) {
                 if (predicate.test(t))
-                    sink.accept(t);
+                    downstream.accept(t);
             }
         };
     }
--- a/src/share/classes/java/util/streams/ops/FlatMapOp.java	Thu Aug 30 11:42:29 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/FlatMapOp.java	Thu Aug 30 19:40:53 2012 -0400
@@ -28,7 +28,7 @@
 import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.functions.FlatMapper;
-import java.util.functions.Sink;
+import java.util.streams.Sink;
 import java.util.streams.Stream;
 import java.util.streams.StreamBuilder;
 import java.util.streams.StreamBuilders;
@@ -58,11 +58,11 @@
     }
 
     @Override
-    public Sink<T> sink(final Sink<? super R> sink) {
+    public Sink<T,?,?> sink(final Sink sink) {
         Objects.requireNonNull(sink);
-        return new Sink<T>() {
+        return new Sink.ChainedLinear<T>(sink) {
             public void accept(T t) {
-                mapper.flatMapInto(sink, t);
+                mapper.flatMapInto(downstream, t);
             }
         };
     }
--- a/src/share/classes/java/util/streams/ops/FoldOp.java	Thu Aug 30 11:42:29 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/FoldOp.java	Thu Aug 30 19:40:53 2012 -0400
@@ -27,6 +27,7 @@
 import java.util.Spliterator;
 import java.util.concurrent.RecursiveTask;
 import java.util.functions.*;
+import java.util.streams.Sink;
 import java.util.streams.StatefulSink;
 
 /**
@@ -61,7 +62,7 @@
             }
 
             @Override
-            public U end() {
+            public U getAndClearState() {
                 try {
                     return state;
                 }
@@ -107,10 +108,11 @@
             }
             else {
                 final StatefulSink<T, U> reduceStage = op.sink();
-                final Sink<V> chain = helper.sink(reduceStage);
-                reduceStage.begin(-1);
+                final Sink<V, ?, ?> chain = helper.sink(reduceStage);
+                chain.begin(-1);
                 source.forEach(chain);
-                return reduceStage.end();
+                chain.end();
+                return reduceStage.getAndClearState();
             }
         }
     }
--- a/src/share/classes/java/util/streams/ops/ForEachOp.java	Thu Aug 30 11:42:29 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/ForEachOp.java	Thu Aug 30 19:40:53 2012 -0400
@@ -24,9 +24,12 @@
  */
 package java.util.streams.ops;
 
+import java.util.Mapping;
 import java.util.Spliterator;
 import java.util.concurrent.RecursiveAction;
-import java.util.functions.Sink;
+import java.util.functions.BiBlock;
+import java.util.functions.Block;
+import java.util.streams.Sink;
 import java.util.streams.StatefulSink;
 
 /**
@@ -35,21 +38,54 @@
  * @author Brian Goetz
  */
 public class ForEachOp<T> implements TerminalOp<T,Void> {
-    private final Sink<? super T> sink;
+    private final StatefulSink<T,Void> sink;
 
-    public ForEachOp(Sink<? super T> sink) {
+    protected ForEachOp(StatefulSink<T, Void> sink) {
         this.sink = sink;
     }
 
+    public static<T> ForEachOp<T> make(final Block<? super T> block) {
+        return new ForEachOp<>(new StatefulSink<T, Void>() {
+            @Override
+            public void accept(T t) {
+                block.apply(t);
+            }
+
+            @Override
+            public Void getAndClearState() {
+                return null;
+            }
+        });
+    }
+
+    public static<K,V> ForEachOp<Mapping<K,V>> make(final BiBlock<? super K, ? super V> block) {
+        return new ForEachOp<>(new StatefulSink<Mapping<K,V>, Void>() {
+            @Override
+            public void accept(Object k, Object v) {
+                block.apply((K) k, (V) v);
+            }
+
+            @Override
+            public void accept(Mapping<K, V> mapping) {
+                block.apply(mapping.getKey(), mapping.getValue());
+            }
+
+            @Override
+            public Void getAndClearState() {
+                return null;
+            }
+        });
+    }
+
     @Override
     public StatefulSink<T, Void> sink() {
-        return (StatefulSink<T, Void>) sink.asStatefulSink();
+        return sink;
     }
 
     @Override
     public <V> Void computeParallel(ParallelOpHelper<T, V> helper) {
         int depth = helper.suggestDepth();
-        Sink<V> compoundSink = helper.sink(sink);
+        Sink<V, ?, ?> compoundSink = helper.sink(sink);
         Spliterator<V> spliterator = helper.spliterator();
         if (depth == 0) {
             spliterator.forEach(compoundSink);
@@ -62,9 +98,9 @@
     private static class ForEachTask<T> extends RecursiveAction {
         private final int depth;
         private final Spliterator<T> spliterator;
-        private final Sink<T> sink;
+        private final Sink<T, ?, ?> sink;
 
-        private ForEachTask(int depth, Spliterator<T> spliterator, Sink<T> sink) {
+        private ForEachTask(int depth, Spliterator<T> spliterator, Sink<T, ?, ?> sink) {
             this.depth = depth;
             this.spliterator = spliterator;
             this.sink = sink;
--- a/src/share/classes/java/util/streams/ops/GroupByOp.java	Thu Aug 30 11:42:29 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/GroupByOp.java	Thu Aug 30 19:40:53 2012 -0400
@@ -55,7 +55,7 @@
             }
 
             @Override
-            public Map<K, Streamable<T>> end() {
+            public Map<K, Streamable<T>> getAndClearState() {
                 Map<K, Streamable<T>> result = (Map) map;
                 map = null;
                 return result;
--- a/src/share/classes/java/util/streams/ops/IntermediateOp.java	Thu Aug 30 11:42:29 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/IntermediateOp.java	Thu Aug 30 19:40:53 2012 -0400
@@ -25,7 +25,7 @@
 package java.util.streams.ops;
 
 import java.util.Iterator;
-import java.util.functions.Sink;
+import java.util.streams.Sink;
 import java.util.streams.Stream;
 
 /**
@@ -76,11 +76,12 @@
      * Return a sink which will accept elements, perform the operation upon
      * each element and send it to the provided sink.
      *
+     *
      * @param sink elements will be sent to this sink after the processing.
      * @return a sink which will accept elements and perform the operation upon
      * each element.
      */
-    Sink<T> sink(Sink<? super U> sink);
+    Sink<T, ?, ?> sink(Sink sink);
 
     Stream.Shape inputShape() default { return Stream.Shape.LINEAR; }
     Stream.Shape outputShape() default { return Stream.Shape.LINEAR; }
--- a/src/share/classes/java/util/streams/ops/MapExtractKeysOp.java	Thu Aug 30 11:42:29 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/MapExtractKeysOp.java	Thu Aug 30 19:40:53 2012 -0400
@@ -27,8 +27,7 @@
 import java.util.Iterator;
 import java.util.MapIterator;
 import java.util.Mapping;
-import java.util.functions.BiSink;
-import java.util.functions.Sink;
+import java.util.streams.Sink;
 import java.util.streams.Stream;
 
 /**
@@ -36,7 +35,7 @@
  *
  * @author Brian Goetz
  */
-public class MapExtractKeysOp<K,V> implements StatelessOp<Mapping<K,V>, K> {
+public class MapExtractKeysOp<K, V> implements StatelessOp<Mapping<K, V>, K> {
     @Override
     public int getStreamFlags(int upstreamState) {
         return upstreamState & ~Stream.STATE_UNKNOWN_MASK_V1;
@@ -44,7 +43,7 @@
 
     @Override
     public Iterator<K> iterator(Iterator<Mapping<K, V>> in) {
-        final MapIterator<K,V> mapIterator = (MapIterator<K, V>) in;
+        final MapIterator<K, V> mapIterator = (MapIterator<K, V>) in;
         return new Iterator<K>() {
             @Override
             public boolean hasNext() {
@@ -59,17 +58,11 @@
     }
 
     @Override
-    public BiSink<K, V> sink(final Sink<? super K> sink) {
-        return new BiSink<K, V>() {
+    public Sink<Mapping<K, V>, K, V> sink(final Sink sink) {
+        return new Sink.ChainedMap<K, V>(sink) {
             @Override
             public void accept(K k, V v) {
-                sink.accept(k);
-            }
-
-            @Override
-            public void accept(Mapping<K, V> entry) {
-                System.out.println("Warning: accept(Entry) called");
-                sink.accept(entry.getKey());
+                downstream.accept(k);
             }
         };
     }
--- a/src/share/classes/java/util/streams/ops/MapExtractValuesOp.java	Thu Aug 30 11:42:29 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/MapExtractValuesOp.java	Thu Aug 30 19:40:53 2012 -0400
@@ -27,8 +27,7 @@
 import java.util.Iterator;
 import java.util.MapIterator;
 import java.util.Mapping;
-import java.util.functions.BiSink;
-import java.util.functions.Sink;
+import java.util.streams.Sink;
 import java.util.streams.Stream;
 
 /**
@@ -36,7 +35,7 @@
  *
  * @author Brian Goetz
  */
-public class MapExtractValuesOp<K,V> implements StatelessOp<Mapping<K,V>, V> {
+public class MapExtractValuesOp<K, V> implements StatelessOp<Mapping<K, V>, V> {
     @Override
     public int getStreamFlags(int upstreamState) {
         return upstreamState & ~Stream.STATE_UNKNOWN_MASK_V1;
@@ -44,7 +43,7 @@
 
     @Override
     public Iterator<V> iterator(Iterator<Mapping<K, V>> in) {
-        final MapIterator<K,V> mapIterator = (MapIterator<K, V>) in;
+        final MapIterator<K, V> mapIterator = (MapIterator<K, V>) in;
         return new Iterator<V>() {
             @Override
             public boolean hasNext() {
@@ -59,17 +58,11 @@
     }
 
     @Override
-    public BiSink<K, V> sink(final Sink<? super V> sink) {
-        return new BiSink<K, V>() {
+    public Sink<Mapping<K, V>, K, V> sink(final Sink sink) {
+        return new Sink.ChainedMap<K, V>(sink) {
             @Override
             public void accept(K k, V v) {
-                sink.accept(v);
-            }
-
-            @Override
-            public void accept(Mapping<K, V> entry) {
-                System.out.println("Warning: accept(Entry) called");
-                sink.accept(entry.getValue());
+                downstream.accept(v);
             }
         };
     }
--- a/src/share/classes/java/util/streams/ops/MapFilterKeysOp.java	Thu Aug 30 11:42:29 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/MapFilterKeysOp.java	Thu Aug 30 19:40:53 2012 -0400
@@ -27,9 +27,8 @@
 import java.util.Iterator;
 import java.util.Mapping;
 import java.util.Objects;
-import java.util.functions.BiSink;
 import java.util.functions.Predicate;
-import java.util.functions.Sink;
+import java.util.streams.Sink;
 import java.util.streams.Stream;
 
 /**
@@ -37,7 +36,7 @@
  *
  * @author Brian Goetz
  */
-public class MapFilterKeysOp<K, V> implements StatelessOp<Mapping<K,V>, Mapping<K,V>> {
+public class MapFilterKeysOp<K, V> implements StatelessOp<Mapping<K, V>, Mapping<K, V>> {
     private final Predicate<? super K> predicate;
 
     public MapFilterKeysOp(Predicate<? super K> predicate) {
@@ -55,24 +54,14 @@
     }
 
     @Override
-    public BiSink<K,V> sink(final Sink<? super Mapping<K, V>> sink) {
-        if (!(sink instanceof BiSink)) {
-            throw new IllegalStateException("Expecting BiSink");
-        }
-        final BiSink<K, V> biSink = (BiSink<K, V>) (Sink) sink;
-        return new BiSink<K,V>() {
+    public Sink<Mapping<K, V>, K, V> sink(final Sink sink) {
+        return new Sink.ChainedMap<K, V>(sink) {
             @Override
             public void accept(K k, V v) {
                 if (predicate.test(k)) {
-                    biSink.accept(k, v);
+                    downstream.accept(k, v);
                 }
             }
-
-            @Override
-            public void accept(Mapping<K, V> entry) {
-                System.out.println("Warning: executing push(Entry)");
-                accept(entry.getKey(), entry.getValue());
-            }
         };
     }
 }
--- a/src/share/classes/java/util/streams/ops/MapFilterValuesOp.java	Thu Aug 30 11:42:29 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/MapFilterValuesOp.java	Thu Aug 30 19:40:53 2012 -0400
@@ -27,9 +27,8 @@
 import java.util.Iterator;
 import java.util.Mapping;
 import java.util.Objects;
-import java.util.functions.BiSink;
 import java.util.functions.Predicate;
-import java.util.functions.Sink;
+import java.util.streams.Sink;
 import java.util.streams.Stream;
 
 /**
@@ -55,24 +54,14 @@
     }
 
     @Override
-    public BiSink<K,V> sink(final Sink<? super Mapping<K, V>> sink) {
-        if (!(sink instanceof BiSink)) {
-            throw new IllegalStateException("Expecting BiSink");
-        }
-        final BiSink<K, V> biSink = (BiSink<K, V>) (Sink) sink;
-        return new BiSink<K,V>() {
+    public Sink<Mapping<K, V>, K, V> sink(Sink sink) {
+        return new Sink.ChainedMap<K, V>(sink) {
             @Override
             public void accept(K k, V v) {
                 if (predicate.test(v)) {
-                    biSink.accept(k, v);
+                    downstream.accept(k, v);
                 }
             }
-
-            @Override
-            public void accept(Mapping<K, V> entry) {
-                System.out.println("Warning: executing push(Entry)");
-                accept(entry.getKey(), entry.getValue());
-            }
         };
     }
 }
--- a/src/share/classes/java/util/streams/ops/MapMapValuesOp.java	Thu Aug 30 11:42:29 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/MapMapValuesOp.java	Thu Aug 30 19:40:53 2012 -0400
@@ -27,9 +27,8 @@
 import java.util.Iterator;
 import java.util.Mapping;
 import java.util.Objects;
-import java.util.functions.BiSink;
 import java.util.functions.Mapper;
-import java.util.functions.Sink;
+import java.util.streams.Sink;
 import java.util.streams.Stream;
 
 /**
@@ -59,21 +58,11 @@
     }
 
     @Override
-    public BiSink<K,V> sink(final Sink<? super Mapping<K, U>> sink) {
-        if (!(sink instanceof BiSink)) {
-            throw new IllegalStateException("Expecting BiSink");
-        }
-        final BiSink<K, U> biSink = (BiSink<K, U>) (Sink) sink;
-        return new BiSink<K,V>() {
+    public Sink<Mapping<K, V>, K, V> sink(Sink sink) {
+        return new Sink.ChainedMap<K,V>(sink) {
             @Override
             public void accept(K k, V v) {
-                biSink.accept(k, mapper.map(v));
-            }
-
-            @Override
-            public void accept(Mapping<K, V> entry) {
-                System.out.println("Warning: executing push(Entry)");
-                accept(entry.getKey(), entry.getValue());
+                downstream.accept(k, mapper.map(v));
             }
         };
     }
--- a/src/share/classes/java/util/streams/ops/MapOp.java	Thu Aug 30 11:42:29 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/MapOp.java	Thu Aug 30 19:40:53 2012 -0400
@@ -27,7 +27,7 @@
 import java.util.Iterator;
 import java.util.Objects;
 import java.util.functions.Mapper;
-import java.util.functions.Sink;
+import java.util.streams.Sink;
 import java.util.streams.Stream;
 
 /**
@@ -54,12 +54,11 @@
     }
 
     @Override
-    public Sink<T> sink(final Sink<? super R> sink) {
-        Objects.requireNonNull(sink);
-        return new Sink<T>() {
+    public Sink<T, ?, ?> sink(Sink sink) {
+        return new Sink.ChainedLinear<T>(sink) {
             @Override
             public void accept(T t) {
-                sink.accept(mapper.map(t));
+                downstream.accept(mapper.map(t));
             }
         };
     }
--- a/src/share/classes/java/util/streams/ops/MapSortedOp.java	Thu Aug 30 11:42:29 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/MapSortedOp.java	Thu Aug 30 19:40:53 2012 -0400
@@ -25,15 +25,15 @@
 package java.util.streams.ops;
 
 import java.util.*;
-import java.util.functions.Sink;
-import java.util.streams.StatefulSink;
+import java.util.streams.Sink;
 import java.util.streams.Stream;
 
 
 /**
  * An operation which sorts elements.
  *
- * @param <T> Type of elements to be sorted.
+ * @param <K> Type of keys to be sorted
+ * @param <V> Values associated with keys to be sorted
  *
  */
 public class MapSortedOp<K extends Comparable<? super K>,V> implements StatefulOp<Mapping<K,V>,Mapping<K,V>,Void> {
@@ -68,8 +68,8 @@
     }
 
     @Override
-    public StatefulSink<Mapping<K, V>, Void> sink(Sink<? super Mapping<K, V>> sink) {
-        return new StatefulSink<Mapping<K,V>, Void>() {
+    public Sink<Mapping<K, V>, K, V> sink(Sink sink) {
+        return new Sink.ChainedMap<K, V>(sink) {
             PriorityQueue<Mapping<K,V>> pq;
             @Override
             public void begin(int size) {
@@ -77,11 +77,15 @@
             }
 
             @Override
-            public Void end() {
+            public void end() {
                 while (!pq.isEmpty()) {
-                    sink.accept(pq.remove());
+                    downstream.accept(pq.remove());
                 }
-                return null;
+            }
+
+            @Override
+            public void accept(K k, V v) {
+                // @@@ Do something here!
             }
 
             @Override
--- a/src/share/classes/java/util/streams/ops/MapSwapOp.java	Thu Aug 30 11:42:29 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/MapSwapOp.java	Thu Aug 30 19:40:53 2012 -0400
@@ -26,8 +26,7 @@
 
 import java.util.Iterator;
 import java.util.Mapping;
-import java.util.functions.BiSink;
-import java.util.functions.Sink;
+import java.util.streams.Sink;
 import java.util.streams.Stream;
 
 /**
@@ -78,21 +77,11 @@
     }
 
     @Override
-    public BiSink<K, V> sink(final Sink<? super Mapping<V, K>> sink) {
-        if (!(sink instanceof BiSink)) {
-            throw new IllegalStateException("Expecting BiSink");
-        }
-        final BiSink<V, K> biSink = (BiSink<V, K>)(Sink)sink;
-        return new BiSink<K, V>() {
+    public Sink<Mapping<K, V>, K, V> sink(Sink sink) {
+        return new Sink.ChainedMap<K,V>(sink) {
             @Override
             public void accept(K k, V v) {
-                biSink.accept(v, k);
-            }
-
-            @Override
-            public void accept(Mapping<K, V> entry) {
-                System.out.println("Warning: executing accept(Entry)");
-                accept(entry.getKey(), entry.getValue());
+                downstream.accept(v, k);
             }
         };
     }
--- a/src/share/classes/java/util/streams/ops/MappedOp.java	Thu Aug 30 11:42:29 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/MappedOp.java	Thu Aug 30 19:40:53 2012 -0400
@@ -25,9 +25,8 @@
 package java.util.streams.ops;
 
 import java.util.*;
-import java.util.functions.BiSink;
 import java.util.functions.Mapper;
-import java.util.functions.Sink;
+import java.util.streams.Sink;
 import java.util.streams.Stream;
 
 /**
@@ -114,14 +113,11 @@
     }
 
     @Override
-    public Sink<T> sink(final Sink<? super Mapping<T, R>> sink) {
-        if (!(sink instanceof BiSink)) {
-            throw new ClassCastException("Expecting BiSink");
-        }
-        final BiSink<T, R> biSink = (BiSink<T, R>) (BiSink) sink;
-        return new Sink<T>() {
+    public Sink<T,?,?> sink(final Sink sink) {
+        return new Sink.ChainedLinear<T>(sink) {
+            @Override
             public void accept(T t) {
-                biSink.accept(t, mapper.map(t));
+                downstream.accept(t, mapper.map(t));
             }
         };
     }
--- a/src/share/classes/java/util/streams/ops/ParallelOp.java	Thu Aug 30 11:42:29 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/ParallelOp.java	Thu Aug 30 19:40:53 2012 -0400
@@ -27,7 +27,7 @@
 import java.util.Iterator;
 import java.util.Spliterator;
 import java.util.concurrent.ForkJoinTask;
-import java.util.functions.Sink;
+import java.util.streams.Sink;
 
 /**
  * ParallelOp
@@ -42,7 +42,7 @@
     public interface ParallelOpHelper<T, V> {
         public int suggestDepth();
         public Spliterator<V> spliterator();
-        public Sink<V> sink(Sink<? super T> sink);
+        public Sink<V, ?, ?> sink(Sink sink);
         public Iterator<T> iterator();
         public<Z> Z invoke(ForkJoinTask<Z> task);
     }
--- a/src/share/classes/java/util/streams/ops/SeedlessFoldOp.java	Thu Aug 30 11:42:29 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/SeedlessFoldOp.java	Thu Aug 30 19:40:53 2012 -0400
@@ -68,7 +68,7 @@
             }
 
             @Override
-            public Optional<T> end() {
+            public Optional<T> getAndClearState() {
                 Optional<T> result = first ? Optional.<T>empty() : new Optional<>(state);
                 state = null;
                 return result;
--- a/src/share/classes/java/util/streams/ops/SortedOp.java	Thu Aug 30 11:42:29 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/SortedOp.java	Thu Aug 30 19:40:53 2012 -0400
@@ -25,8 +25,7 @@
 package java.util.streams.ops;
 
 import java.util.*;
-import java.util.functions.Sink;
-import java.util.streams.StatefulSink;
+import java.util.streams.Sink;
 import java.util.streams.Stream;
 
 
@@ -64,20 +63,20 @@
     }
 
     @Override
-    public StatefulSink<T, Void> sink(final Sink<? super T> sink) {
-        return new StatefulSink<T, Void>() {
+    public Sink<T, ?, ?> sink(Sink sink) {
+        return new Sink.ChainedLinear<T>(sink) {
             PriorityQueue<T> pq;
             @Override
             public void begin(int size) {
                 pq = new PriorityQueue<>(size > 0 ? size : DEFAULT_PRIORITY_QUEUE_SIZE, comparator);
+                downstream.begin(size);
             }
 
             @Override
-            public Void end() {
-                while (!pq.isEmpty()) {
-                    sink.accept(pq.remove());
-                }
-                return null;
+            public void end() {
+                while (!pq.isEmpty())
+                    downstream.accept(pq.remove());
+                downstream.end();
             }
 
             @Override
--- a/src/share/classes/java/util/streams/ops/StatefulOp.java	Thu Aug 30 11:42:29 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/StatefulOp.java	Thu Aug 30 19:40:53 2012 -0400
@@ -1,7 +1,6 @@
 package java.util.streams.ops;
 
 import java.util.*;
-import java.util.functions.*;
 import java.util.streams.*;
 
 /**
@@ -15,7 +14,7 @@
 public interface StatefulOp<T, U, V> extends IntermediateOp<T,U>, ParallelOp<T, TreeUtils.Node<U>> {
 
     @Override
-    public abstract StatefulSink<T, V> sink(Sink<? super U> sink);
+    public abstract Sink<T, ?, ?> sink(Sink sink);
 
     @Override
     public boolean isStateful() default {
@@ -26,12 +25,11 @@
     <Z> TreeUtils.Node<U> computeParallel(ParallelOpHelper<T, Z> helper) default {
         // dumb default serial implementation
         final StreamBuilder<U> sb = StreamBuilders.make();
-        StatefulSink<T, V> sSink = sink(sb);
-        Sink<Z> sink = helper.sink(sSink);
+        Sink<Z, ?, ?> sink = helper.sink(sink(sb));
         Spliterator<Z> spliterator = helper.spliterator();
-        sSink.begin(spliterator.getRemainingSizeIfKnown());
+        sink.begin(spliterator.getRemainingSizeIfKnown());
         spliterator.forEach(sink);
-        sSink.end();
+        sink.end();
         return TreeUtils.node(sb);
     }
 }
--- a/src/share/classes/java/util/streams/ops/TerminalOp.java	Thu Aug 30 11:42:29 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/TerminalOp.java	Thu Aug 30 19:40:53 2012 -0400
@@ -26,7 +26,7 @@
 
 import java.util.Iterator;
 import java.util.Spliterator;
-import java.util.functions.Sink;
+import java.util.streams.Sink;
 import java.util.streams.StatefulSink;
 
 /**
@@ -47,7 +47,8 @@
         sink.begin(-1);
         while (iterator.hasNext())
             sink.accept(iterator.next());
-        return sink.end();
+        sink.end();
+        return sink.getAndClearState();
     }
 
     public StatefulSink<T, U> sink();
@@ -57,15 +58,12 @@
     @Override
     <V> U computeParallel(ParallelOpHelper<T, V> helper) default {
         // dumb default serial version
-        StatefulSink<T, U> sSink = sink();
-        Sink<V> vSink = helper.sink(sSink);
+        StatefulSink<T, U> toArraySink = sink();
+        Sink<V, ?, ?> sink = helper.sink(toArraySink);
         Spliterator<V> spliterator = helper.spliterator();
-        sSink.begin(spliterator.getRemainingSizeIfKnown());
-        // @@@ This is ridiculous!  Integrate Sink/StatefulSink functionality in a less hacky way
-        if (sSink == vSink)
-            spliterator.forEach(vSink);
-        else
-            TreeUtils.forEach(spliterator, vSink);
-        return sSink.end();
+        sink.begin(spliterator.getRemainingSizeIfKnown());
+        spliterator.forEach(sink);
+        sink.end();
+        return toArraySink.getAndClearState();
     }
 }
--- a/src/share/classes/java/util/streams/ops/ToArrayOp.java	Thu Aug 30 11:42:29 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/ToArrayOp.java	Thu Aug 30 19:40:53 2012 -0400
@@ -66,15 +66,11 @@
             }
 
             @Override
-            public Object[] end() {
-                Object[] result;
-                if (count == elements.length) {
-                    result = elements;
-                } else {
-                    result = Arrays.copyOf(elements, count);
-                }
+            public Object[] getAndClearState() {
+                Object[] result = (count == elements.length)
+                                  ? elements
+                                  : Arrays.copyOf(elements, count);
                 elements = null;
-
                 return result;
             }
         };
--- a/src/share/classes/java/util/streams/ops/TreeUtils.java	Thu Aug 30 11:42:29 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/TreeUtils.java	Thu Aug 30 19:40:53 2012 -0400
@@ -27,7 +27,7 @@
 import java.util.*;
 import java.util.concurrent.RecursiveAction;
 import java.util.concurrent.RecursiveTask;
-import java.util.functions.Sink;
+import java.util.functions.Block;
 import java.util.streams.*;
 
 /**
@@ -40,21 +40,6 @@
         throw new Error("no instances");
     }
 
-    // @@@ This method is a hack, in that helper.sink() might serve up a stateful sink.
-    // This logic should be more tightly reflected in the helper rather than expecting every client to
-    // do instanceof and casts.
-    static<T> void forEach(Spliterator<T> spliterator, Sink<? super T> sink) {
-        if (sink instanceof StatefulSink) {
-            StatefulSink<T, ?> sSink = (StatefulSink<T, ?>) sink;
-            sSink.begin(spliterator.getRemainingSizeIfKnown());
-            spliterator.forEach(sSink);
-            sSink.end();
-        }
-        else {
-            spliterator.forEach(sink);
-        }
-    }
-
     public static<T, U> Node<T> collect(ParallelOp.ParallelOpHelper<T, U> helper,
                                        boolean flattenLeaves,
                                        boolean flattenTree) {
@@ -68,12 +53,18 @@
             // Need to account for SIZED flag from pipeline
             if (size != -1 && splitSizesKnown) {
                 builder = StreamBuilders.makeFixed(size);
-                forEach(spliterator, helper.sink(builder));
+                Sink sink = helper.sink(builder);
+                sink.begin(spliterator.getRemainingSizeIfKnown());
+                spliterator.forEach(sink);
+                sink.end();
                 return node((T[]) builder.toArray());
             }
             else {
                 builder = StreamBuilders.make();
-                forEach(spliterator, helper.sink(builder));
+                Sink sink = helper.sink(builder);
+                sink.begin(spliterator.getRemainingSizeIfKnown());
+                spliterator.forEach(sink);
+                sink.end();
                 return node(builder);
             }
         }
@@ -227,9 +218,9 @@
             }
 
             @Override
-            public void forEach(Sink<? super T> sink) {
+            public void forEach(Block<? super T> block) {
                 for (T t : data)
-                    sink.accept(t);
+                    block.apply(t);
             }
 
             @Override
@@ -274,8 +265,8 @@
             }
 
             @Override
-            public void forEach(Sink<? super T> sink) {
-                data.forEach(sink);
+            public void forEach(Block<? super T> block) {
+                data.forEach(block);
             }
 
             @Override
@@ -343,9 +334,9 @@
         }
 
         @Override
-        public void forEach(Sink<? super T> sink) {
-            left.forEach(sink);
-            right.forEach(sink);
+        public void forEach(Block<? super T> block) {
+            left.forEach(block);
+            right.forEach(block);
         }
 
         @Override
@@ -408,14 +399,14 @@
             }
 
             @Override
-            public void forEach(Sink<? super T> sink) {
+            public void forEach(Block<? super T> block) {
                 if (iterator == null) {
-                    cur.forEach(sink);
+                    cur.forEach(block);
                     iterator = Collections.emptyIterator();
                 }
                 else {
                     while (iterator.hasNext())
-                        sink.accept(iterator.next());
+                        block.apply(iterator.next());
                 }
             }
 
--- a/src/share/classes/java/util/streams/ops/UniqOp.java	Thu Aug 30 11:42:29 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/UniqOp.java	Thu Aug 30 19:40:53 2012 -0400
@@ -25,8 +25,7 @@
 package java.util.streams.ops;
 
 import java.util.*;
-import java.util.functions.Sink;
-import java.util.streams.StatefulSink;
+import java.util.streams.Sink;
 import java.util.streams.Stream;
 
 /**
@@ -54,25 +53,26 @@
     }
 
     @Override
-    public StatefulSink<T, Void> sink(final Sink<? super T> sink) {
-        return new StatefulSink<T, Void>() {
+    public Sink<T, ?, ?> sink(Sink sink) {
+        return new Sink.ChainedLinear<T>(sink) {
             Set<T> seen;
 
             @Override
             public void begin(int size) {
                 seen = new HashSet<>();
+                downstream.begin(size);
             }
 
             @Override
-            public Void end() {
+            public void end() {
                 seen = null;
-                return null;
+                downstream.end();
             }
 
             @Override
             public void accept(T t) {
                 if (seen.add(t)) {
-                    sink.accept(t);
+                    downstream.accept(t);
                 }
             }
         };
--- a/test-ng/tests/org/openjdk/tests/java/util/IteratorsNullTest.java	Thu Aug 30 11:42:29 2012 -0400
+++ b/test-ng/tests/org/openjdk/tests/java/util/IteratorsNullTest.java	Thu Aug 30 19:40:53 2012 -0400
@@ -24,17 +24,16 @@
  */
 package org.openjdk.tests.java.util;
 
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Factory;
+import org.testng.annotations.Test;
+
 import java.util.*;
 import java.util.functions.*;
 import java.util.streams.ops.FilterOp;
 import java.util.streams.ops.MapOp;
 import java.util.streams.ops.SortedOp;
 
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Factory;
-import org.testng.annotations.Test;
-
-
 import static org.openjdk.tests.java.util.LambdaTestHelpers.*;
 
 /**
@@ -43,7 +42,7 @@
 @Test(groups = { "null", "lambda" })
 public class IteratorsNullTest extends NullArgsTestCase {
     @Factory(dataProvider = "data")
-    public IteratorsNullTest(String name, Sink<Object[]> sink, Object[] args) {
+    public IteratorsNullTest(String name, Block<Object[]> sink, Object[] args) {
         super(name, sink, args);
     }
 
@@ -52,64 +51,64 @@
     public static Object[][] makeData() {
         List<Integer> list = countTo(10);
         return new Object[][] {
-                { "FilterOp.iterator", new Sink<Object[]>() {
-                    public void accept(Object[] args) {
+                { "FilterOp.iterator", new Block<Object[]>() {
+                    public void apply(Object[] args) {
                         FilterOp.iterator((Iterator<Integer>) args[0], (Predicate<Integer>) args[1]);
                     }
                 }, new Object[] { list.iterator(), pTrue }
                 },
-                { "MapOp.iterator", new Sink<Object[]>() {
-                    public void accept(Object[] args) {
+                { "MapOp.iterator", new Block<Object[]>() {
+                    public void apply(Object[] args) {
                         MapOp.iterator((Iterator<Integer>) args[0], (Mapper<Integer, Integer>) args[1]);
                     }
                 }, new Object[] { list.iterator(), mDoubler }
                 },
-                {"Iterators.reduce", new Sink<Object[]>() {
-                    public void accept(Object[] args) {
+                {"Iterators.reduce", new Block<Object[]>() {
+                    public void apply(Object[] args) {
                         Iterators.<Integer>reduce((Iterator<Integer>) args[0], 0, (BinaryOperator<Integer>) args[1]);
                     }
                 }, new Object[]{list.iterator(), rPlus}
                 },
-                { "Iterators.mapReduce", new Sink<Object[]>() {
-                    public void accept(Object[] args) {
+                { "Iterators.mapReduce", new Block<Object[]>() {
+                    public void apply(Object[] args) {
                         Iterators.<Integer, Integer>mapReduce((Iterator<Integer>) args[0],
                                                               (Mapper<Integer, Integer>) args[1],
                                                               0, (BinaryOperator<Integer>) args[2]);
                     }
                 }, new Object[] { list.iterator(), mDoubler, rPlus }
                 },
-                { "Iterators.mapReduce(int)", new Sink<Object[]>() {
-                    public void accept(Object[] args) {
+                { "Iterators.mapReduce(int)", new Block<Object[]>() {
+                    public void apply(Object[] args) {
                         Iterators.mapReduce((Iterator<Integer>) args[0],
                                             (IntMapper<Integer>) args[1],
                                             0, (IntBinaryOperator) args[2]);
                     }
                 }, new Object[] { list.iterator(), imDoubler, irPlus }
                 },
-                { "Iterators.mapReduce(long)", new Sink<Object[]>() {
-                    public void accept(Object[] args) {
+                { "Iterators.mapReduce(long)", new Block<Object[]>() {
+                    public void apply(Object[] args) {
                         Iterators.mapReduce((Iterator<Integer>) args[0],
                                             (LongMapper<Integer>) args[1],
                                             0, (LongBinaryOperator) args[2]);
                     }
                 }, new Object[] { asLongs(list).iterator(), lmDoubler, lrPlus }
                 },
-                { "Iterators.mapReduce(double)", new Sink<Object[]>() {
-                    public void accept(Object[] args) {
+                { "Iterators.mapReduce(double)", new Block<Object[]>() {
+                    public void apply(Object[] args) {
                         Iterators.mapReduce((Iterator<Integer>) args[0],
                                             (DoubleMapper<Integer>) args[1],
                                             0, (DoubleBinaryOperator) args[2]);
                     }
                 }, new Object[] { asDoubles(list).iterator(), dmDoubler, drPlus }
                 },
-                { "SortedOp.iterator", new Sink<Object[]>() {
-                    public void accept(Object[] args) {
+                { "SortedOp.iterator", new Block<Object[]>() {
+                    public void apply(Object[] args) {
                         SortedOp.iterator((Iterator<Integer>) args[0], Comparators.<Integer>naturalOrder());
                     }
                 }, new Object[] { list.iterator() }
                 },
-                { "Iterators.sorted(Comparator)", new Sink<Object[]>() {
-                    public void accept(Object[] args) {
+                { "Iterators.sorted(Comparator)", new Block<Object[]>() {
+                    public void apply(Object[] args) {
                         SortedOp.iterator((Iterator<Integer>) args[0], (Comparator) args[1]);
                     }
                 }, new Object[] { list.iterator(), cInteger }
--- a/test-ng/tests/org/openjdk/tests/java/util/LambdaTestHelpers.java	Thu Aug 30 11:42:29 2012 -0400
+++ b/test-ng/tests/org/openjdk/tests/java/util/LambdaTestHelpers.java	Thu Aug 30 19:40:53 2012 -0400
@@ -41,9 +41,9 @@
     public static final Mapper<Integer, Integer> mZero = x -> 0;
     public static final Mapper<Integer, Integer> mId = x -> x;
     public static final Mapper<Integer, Integer> mDoubler = x -> x * 2;
-    public static final FlatMapper<Integer, Integer> mfId = /*@@@ Sink::accept*/ (s,e) -> { s.accept(e); };
+    public static final FlatMapper<Integer, Integer> mfId = /*@@@ Sink::accept*/ (s,e) -> { s.apply(e); };
     public static final FlatMapper<Integer, Integer> mfNull = (s, e) -> { };
-    public static final FlatMapper<Integer, Integer> mfLt = (s, e) -> { for (int i=0; i<e; i++) s.accept(i); };
+    public static final FlatMapper<Integer, Integer> mfLt = (s, e) -> { for (int i=0; i<e; i++) s.apply(i); };
     public static final IntMapper<Integer> imDoubler = x -> x * 2;
     public static final LongMapper<Long> lmDoubler = x -> x * 2;
     public static final DoubleMapper<Double> dmDoubler = x -> x * 2;
@@ -61,14 +61,14 @@
 
     public static final FlatMapper<String, Character> flattenChars = new FlatMapper<String, Character>() {
         @Override
-        public void flatMapInto(Sink<? super Character> sink, String element) {
+        public void flatMapInto(Block<? super Character> sink, String element) {
             for (int i=0; i<element.length(); i++) {
-                sink.accept(element.charAt(i));
+                sink.apply(element.charAt(i));
             }
         }
     };
 
-    public static List<Integer> nullStream() {
+    public static List<Integer> empty() {
         ArrayList<Integer> list = new ArrayList<>();
         list.add(null);
         return list;
@@ -199,6 +199,7 @@
         assertTrue(!pI.hasNext());
     }
 
+    @SafeVarargs
     public static<T> void assertContents(Iterator<T> pi, T... values) {
         assertContents(pi, Arrays.asList(values).iterator());
     }
@@ -247,35 +248,12 @@
         }
     }
 
-    private static <T> Iterable<Iterable<T>> splitHelper(Spliterator<T> s, int depth, List<Iterable<T>> iterables) {
-        if (depth == 0) {
-            List<T> list = new ArrayList<>();
-            while (s.hasNext()) {
-                list.add(s.next());
-            }
-            iterables.add(list);
-        }
-        else {
-            splitHelper(s.split(), depth-1, iterables);
-            splitHelper(s, depth-1, iterables);
-        }
-        return iterables;
-    }
-
-    public static <T> Sink<T> iteratorToSink(Iterator<? extends T> iterator, Sink<? super T> sink) {
-        while(iterator.hasNext()) {
-            sink.accept(iterator.next());
-        }
-
-        return (Sink<T>) sink;
-    }
-
     public static <T,V> V iteratorToStatefulSink(Iterator<? extends T> iterator, StatefulSink<? super T, ? extends V> sink) {
         sink.begin(-1);
         while(iterator.hasNext()) {
             sink.accept(iterator.next());
         }
-
-        return sink.end();
+        sink.end();
+        return sink.getAndClearState();
     }
 }
--- a/test-ng/tests/org/openjdk/tests/java/util/NullArgsTestCase.java	Thu Aug 30 11:42:29 2012 -0400
+++ b/test-ng/tests/org/openjdk/tests/java/util/NullArgsTestCase.java	Thu Aug 30 19:40:53 2012 -0400
@@ -24,10 +24,10 @@
  */
 package org.openjdk.tests.java.util;
 
-import java.util.Arrays;
 import org.testng.annotations.Test;
 
-import java.util.functions.Sink;
+import java.util.Arrays;
+import java.util.functions.Block;
 
 import static org.testng.Assert.fail;
 
@@ -41,10 +41,10 @@
  */
 public abstract class NullArgsTestCase {
     public final String name;
-    public final Sink<Object[]> sink;
+    public final Block<Object[]> sink;
     public final Object[] args;
 
-    protected NullArgsTestCase(String name, Sink<Object[]> sink, Object[] args) {
+    protected NullArgsTestCase(String name, Block<Object[]> sink, Object[] args) {
         this.name = name;
         this.sink = sink;
         this.args = args;
@@ -52,7 +52,7 @@
 
     @Test
     public void goodNonNull() {
-        sink.accept(args);
+        sink.apply(args);
     }
 
     @Test
@@ -61,7 +61,7 @@
             Object[] temp = Arrays.copyOf(args, args.length);
             temp[i] = null;
             try {
-                sink.accept(temp);
+                sink.apply(temp);
                 fail(String.format("Expected NullPointerException for argument %d of test case %s", i, name));
             }
             catch (NullPointerException e) {
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/FlatMapOpTest.java	Thu Aug 30 11:42:29 2012 -0400
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/FlatMapOpTest.java	Thu Aug 30 19:40:53 2012 -0400
@@ -41,12 +41,12 @@
  */
 @Test
 public class FlatMapOpTest extends StreamOpTestCase {
-    private static final FlatMapper<Integer, Integer> mfIntToBits = (sink, integer) -> {
+    private static final FlatMapper<Integer, Integer> mfIntToBits = (block, integer) -> {
         int num = integer;
         for (int i = 0; i < 16 && num != 0; i++) {
             if ((num & (1 << i)) != 0) {
                 num &= ~(1 << i);
-                sink.accept(i);
+                block.apply(i);
             }
         }
     };
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/StreamOpTestCase.java	Thu Aug 30 11:42:29 2012 -0400
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/StreamOpTestCase.java	Thu Aug 30 19:40:53 2012 -0400
@@ -32,7 +32,6 @@
 import java.util.concurrent.ForkJoinUtils;
 import java.util.functions.BiPredicate;
 import java.util.functions.Block;
-import java.util.functions.Sink;
 import java.util.streams.*;
 import java.util.streams.ops.IntermediateOp;
 import java.util.streams.ops.ParallelOp;
@@ -47,7 +46,7 @@
 @Test
 public abstract class StreamOpTestCase extends Assert {
 
-    protected static<T> void assertMatches(StreamResult<T> refResult, Block<Sink<T>> block) {
+    protected static<T> void assertMatches(StreamResult<T> refResult, Block<Sink> block) {
         StreamResult<T> newResult = new StreamResult<>();
         block.apply(newResult);
         assertEquals(refResult, newResult);
@@ -80,24 +79,19 @@
 
         // Second pass -- create a sink and wrap it
         assertMatches(refResult, sink -> {
-            Sink<T> wrapped = sink(sink, ops);
-            if (wrapped instanceof StatefulSink) {
-                StatefulSink<T, ?> stateful = (StatefulSink<T, ?>)wrapped;
-                stateful.begin(-1);
-                data.forEach(stateful);
-                stateful.end();
-            } else {
-                data.forEach(wrapped);
-            }
+            Sink<T, ?, ?> wrapped = sink(sink, ops);
+            wrapped.begin(-1);
+            data.forEach(wrapped);
+            wrapped.end();
         });
 
         // Third pass -- wrap with SequentialPipeline.op, and iterate in push mode
-        assertMatches(refResult, sink -> { stream(data.seq(ops)).forEach((Sink) sink); });
+        assertMatches(refResult, (Sink sink) -> { stream(data.seq(ops)).forEach(sink); });
 
         // Wrap as stream, and iterate in pull mode
         assertMatches(refResult, sink -> {
             for (Iterator<?> seqIter = data.seq(ops).iterator(); seqIter.hasNext(); )
-                sink.accept((U) seqIter.next());
+                sink.accept(seqIter.next());
         });
 
         // Wrap as stream, and iterate in mixed mode
@@ -105,23 +99,24 @@
             Stream<?> stream = stream(data.seq(ops));
             Iterator<?> iter = stream.iterator();
             if (iter.hasNext())
-                sink.accept((U) iter.next());
-            stream.forEach((Sink) sink);
+                sink.accept(iter.next());
+            stream.forEach(sink);
         });
 
         // Wrap as parallel stream + sequential
-        assertMatches(refResult, sink -> { stream(data.par(ops)).sequential().forEach((Sink) sink); });
+        assertMatches(refResult, (Sink sink) -> { stream(data.par(ops)).sequential().forEach(sink); });
 
         // Wrap as parallel stream + toArray
         assertMatches(refResult, sink -> {
             for (Object t : stream(data.par(ops)).toArray())
-                sink.accept((U) t);
+                sink.accept(t);
         });
 
         // Wrap as parallel stream + into
         assertMatches(refResult, sink -> {
-            for (Object u : stream(data.par(ops)).sequential().into(new ArrayList()))
-                sink.accept((U) u);
+            ArrayList list = stream(data.par(ops)).sequential().into(new ArrayList());
+            for (Object u : list)
+                sink.accept(u);
         });
 
         // More ways to iterate the PSS: iterate result of op
@@ -153,13 +148,15 @@
             StatefulSink<T, U> sink = terminalOp.sink();
             sink.begin(-1);
             data.forEach(sink);
-            answer = sink.end();
+            sink.end();
+            answer = sink.getAndClearState();
 
             // Create a sink and evaluate, with size advice
             StatefulSink<T, U> sink2 = terminalOp.sink();
             sink2.begin(data.size());
             data.forEach(sink2);
-            U answer2 = sink2.end();
+            sink2.end();
+            U answer2 = sink2.getAndClearState();
             assertTrue(equalator.test(answer, answer2));
         }
 
@@ -191,7 +188,7 @@
         }
     }
 
-    static class StreamResult<T> implements Sink<T>, Traversable<T>, Sized {
+    static class StreamResult<T> implements Sink.OfLinear<T>, Traversable<T>, Sized {
 
         private Object[] array;
 
@@ -207,9 +204,9 @@
         }
 
         @Override
-        public void forEach(Sink<? super T> sink) {
+        public void forEach(Block<? super T> block) {
             for (int i = 0; i < offset; i++) {
-                sink.accept((T) array[i]);
+                block.apply((T) array[i]);
             }
         }
 
@@ -262,7 +259,7 @@
         }
 
         @Override
-        public Sink sink(Sink sink) {
+        public Sink<T, ?, ?> sink(Sink sink) {
             return sink;
         }
 
@@ -330,7 +327,7 @@
         AbstractPipeline<?, T> par();
         Iterator<T> iterator();
         Spliterator<T> spliterator();
-        void forEach(Sink<? super T> sink);
+        void forEach(Block<? super T> block);
         int size();
 
         <U> AbstractPipeline<?, U> seq(IntermediateOp<T, U> op) default {
@@ -417,9 +414,9 @@
         }
 
         @Override
-        public void forEach(Sink<? super T> sink) {
+        public void forEach(Block<? super T> block) {
             for (T t : array)
-                sink.accept(t);
+                block.apply(t);
         }
 
         @Override
@@ -465,8 +462,8 @@
         }
 
         @Override
-        public void forEach(Sink<? super T> sink) {
-            collection.forEach(sink);
+        public void forEach(Block<? super T> block) {
+            collection.forEach(block);
         }
 
         @Override
--- a/test-ng/tests/org/openjdk/tests/javac/LambdaTranslationTest1.java	Thu Aug 30 11:42:29 2012 -0400
+++ b/test-ng/tests/org/openjdk/tests/javac/LambdaTranslationTest1.java	Thu Aug 30 19:40:53 2012 -0400
@@ -26,7 +26,7 @@
 
 import org.testng.annotations.Test;
 
-import java.util.functions.Sink;
+import java.util.functions.Block;
 
 import static org.testng.Assert.assertEquals;
 
@@ -69,63 +69,63 @@
     }
 
     public void testLambdas() {
-        Sink<Object> b = t -> {setResult("Sink0::" + t);};
-        b.accept("Howdy");
+        Block<Object> b = t -> {setResult("Sink0::" + t);};
+        b.apply("Howdy");
         assertResult("Sink0::Howdy");
 
-        Sink<String> b1 = t -> {setResult("Sink1::" + t);};
-        b1.accept("Rowdy");
+        Block<String> b1 = t -> {setResult("Sink1::" + t);};
+        b1.apply("Rowdy");
         assertResult("Sink1::Rowdy");
 
         for (int i = 5; i < 10; ++i) {
-            Sink<Integer> b2 = t -> {setResult("Sink2::" + t);};
-            b2.accept(i);
+            Block<Integer> b2 = t -> {setResult("Sink2::" + t);};
+            b2.apply(i);
             assertResult("Sink2::" + i);
         }
 
-        Sink<Integer> b3 = t -> {setResult("Sink3::" + t);};
+        Block<Integer> b3 = t -> {setResult("Sink3::" + t);};
         for (int i = 900; i > 0; i -= 100) {
-            b3.accept(i);
+            b3.apply(i);
             assertResult("Sink3::" + i);
         }
 
         cntxt = "blah";
-        Sink<String> b4 = t -> {setResult(String.format("b4: %s .. %s", cntxt, t));};
-        b4.accept("Yor");
+        Block<String> b4 = t -> {setResult(String.format("b4: %s .. %s", cntxt, t));};
+        b4.apply("Yor");
         assertResult("b4: blah .. Yor");
 
         String flaw = "flaw";
-        Sink<String> b5 = t -> {setResult(String.format("b5: %s .. %s", flaw, t));};
-        b5.accept("BB");
+        Block<String> b5 = t -> {setResult(String.format("b5: %s .. %s", flaw, t));};
+        b5.apply("BB");
         assertResult("b5: flaw .. BB");
 
         cntxt = "flew";
-        Sink<String> b6 = t -> {setResult(String.format("b6: %s .. %s .. %s", t, cntxt, flaw));};
-        b6.accept("flee");
+        Block<String> b6 = t -> {setResult(String.format("b6: %s .. %s .. %s", t, cntxt, flaw));};
+        b6.apply("flee");
         assertResult("b6: flee .. flew .. flaw");
 
-        Sink<String> b7 = t -> {setResult(String.format("b7: %s %s", t, this.protectedSuperclassMethod()));};
-        b7.accept("this:");
+        Block<String> b7 = t -> {setResult(String.format("b7: %s %s", t, this.protectedSuperclassMethod()));};
+        b7.apply("this:");
         assertResult("b7: this: instance:flew");
 
-        Sink<String> b8 = t -> {setResult(String.format("b8: %s %s", t, super.protectedSuperclassMethod()));};
-        b8.accept("super:");
+        Block<String> b8 = t -> {setResult(String.format("b8: %s %s", t, super.protectedSuperclassMethod()));};
+        b8.apply("super:");
         assertResult("b8: super: I'm the sub");
 
-        Sink<String> b7b = t -> {setResult(String.format("b9: %s %s", t, protectedSuperclassMethod()));};
-        b7b.accept("implicit this:");
+        Block<String> b7b = t -> {setResult(String.format("b9: %s %s", t, protectedSuperclassMethod()));};
+        b7b.apply("implicit this:");
         assertResult("b9: implicit this: instance:flew");
 
-        Sink<Object> b10 = t -> {setResult(String.format("b10: new LT1Thing: %s", (new LT1Thing(t)).str));};
-        b10.accept("thing");
+        Block<Object> b10 = t -> {setResult(String.format("b10: new LT1Thing: %s", (new LT1Thing(t)).str));};
+        b10.apply("thing");
         assertResult("b10: new LT1Thing: thing");
 
-        Sink<Object> b11 = t -> {setResult(String.format("b11: %s", (new LT1Thing(t) {
+        Block<Object> b11 = t -> {setResult(String.format("b11: %s", (new LT1Thing(t) {
             String get() {
                 return "*" + str.toString() + "*";
             }
         }).get()));};
-        b11.accept(999);
+        b11.apply(999);
         assertResult("b11: *999*");
     }
 
@@ -162,17 +162,17 @@
         private int that = 1234;
 
         void doInner() {
-            Sink<String> i4 = t -> {setResult(String.format("i4: %d .. %s", that, t));};
-            i4.accept("=1234");
+            Block<String> i4 = t -> {setResult(String.format("i4: %d .. %s", that, t));};
+            i4.apply("=1234");
             assertResult("i4: 1234 .. =1234");
 
-            Sink<String> i5 = t -> {setResult(""); appendResult(t); appendResult(t);};
-            i5.accept("fruit");
+            Block<String> i5 = t -> {setResult(""); appendResult(t); appendResult(t);};
+            i5.apply("fruit");
             assertResult("fruitfruit");
 
             cntxt = "human";
-            Sink<String> b4 = t -> {setResult(String.format("b4: %s .. %s", cntxt, t));};
-            b4.accept("bin");
+            Block<String> b4 = t -> {setResult(String.format("b4: %s .. %s", cntxt, t));};
+            b4.apply("bin");
             assertResult("b4: human .. bin");
 
             final String flaw = "flaw";
@@ -182,17 +182,17 @@
  System.out.printf("c5: %s\n", c5.call() );
  **/
 
-            Sink<String> b5 = t -> {setResult(String.format("b5: %s .. %s", flaw, t));};
-            b5.accept("BB");
+            Block<String> b5 = t -> {setResult(String.format("b5: %s .. %s", flaw, t));};
+            b5.apply("BB");
             assertResult("b5: flaw .. BB");
 
             cntxt = "borg";
-            Sink<String> b6 = t -> {setResult(String.format("b6: %s .. %s .. %s", t, cntxt, flaw));};
-            b6.accept("flee");
+            Block<String> b6 = t -> {setResult(String.format("b6: %s .. %s .. %s", t, cntxt, flaw));};
+            b6.apply("flee");
             assertResult("b6: flee .. borg .. flaw");
 
-            Sink<String> b7b = t -> {setResult(String.format("b7b: %s %s", t, protectedSuperclassMethod()));};
-            b7b.accept("implicit outer this");
+            Block<String> b7b = t -> {setResult(String.format("b7b: %s %s", t, protectedSuperclassMethod()));};
+            b7b.apply("implicit outer this");
             assertResult("b7b: implicit outer this instance:borg");
 
             /**
--- a/test-ng/tests/org/openjdk/tests/separate/SourceModel.java	Thu Aug 30 11:42:29 2012 -0400
+++ b/test-ng/tests/org/openjdk/tests/separate/SourceModel.java	Thu Aug 30 19:40:53 2012 -0400
@@ -234,8 +234,10 @@
                            .swap()
                            .into(dependencies);
             // Do these last so that they override 
-            this.typeDependencies.stream().mapped(x->x.getName())
-                                 .swap().into(dependencies);
+            this.typeDependencies.stream()
+                    .mapped(x->x.getName())
+                    .swap()
+                    .into(dependencies);
             return dependencies.values();
         }
     }