changeset 6081:8ab20b8fe27a

Replace StreamAccessor.into with StreamAccessor.forEach. Sink protocol is embedded in PipelineHelper.
author psandoz
date Mon, 15 Oct 2012 15:45:20 -0700
parents b04cf326ce82
children c6ca097f901c
files src/share/classes/java/util/streams/AbstractPipeline.java src/share/classes/java/util/streams/ParallelPipelineHelper.java src/share/classes/java/util/streams/PipelineHelper.java src/share/classes/java/util/streams/StreamAccessor.java src/share/classes/java/util/streams/Streams.java src/share/classes/java/util/streams/ops/StatefulOp.java
diffstat 6 files changed, 49 insertions(+), 70 deletions(-) [+]
line wrap: on
line diff
--- a/src/share/classes/java/util/streams/AbstractPipeline.java	Mon Oct 15 15:45:10 2012 -0700
+++ b/src/share/classes/java/util/streams/AbstractPipeline.java	Mon Oct 15 15:45:20 2012 -0700
@@ -220,24 +220,21 @@
         }
 
         @Override
-        public void into(Sink<E_OUT, ?, ?> sink) {
+        public void wrapInto(Sink<E_OUT, ?, ?> sink) {
             Objects.requireNonNull(sink);
 
             // @@@ Need to check if any upstream streams have been pulled using iterator
             if (isShortCircuit() || iterator != null) {
-                into(AbstractPipeline.this.iterator(), sink);
+                Iterator<E_OUT> it = iterator();
+                sink.begin(-1);
+                while (it.hasNext())
+                    sink.accept(it.next());
+                sink.end();
             }  else {
-                getStreamAccessor().into(wrapSink(sink));
+                super.wrapInto(sink);
             }
         }
 
-        private void into(Iterator<E_OUT> it, Sink<E_OUT, ?, ?> sink) {
-            sink.begin(-1);
-            while (it.hasNext())
-                sink.accept(it.next());
-            sink.end();
-        }
-
         @Override
         public Iterator<E_OUT> iterator() {
             return AbstractPipeline.this.iterator();
--- a/src/share/classes/java/util/streams/ParallelPipelineHelper.java	Mon Oct 15 15:45:10 2012 -0700
+++ b/src/share/classes/java/util/streams/ParallelPipelineHelper.java	Mon Oct 15 15:45:20 2012 -0700
@@ -66,11 +66,25 @@
         sink.end();
     }
 
+    /**
+     * Create a wrapped iterator from a spliterator.
+     *
+     * @param sp the spliterator from which an iterator is obtained
+     *           and then wrapped (see {@link #wrapIterator(java.util.Iterator)}).
+     * @return the wrapped iterator.
+     */
     Iterator<P_OUT> iterator(Spliterator<P_IN> sp) default {
         return wrapIterator(sp.iterator());
     }
 
     /**
+     * Get the spliterator for the stream accessor that inputs elements to the pipeline.
+     *
+     * @return the spliterator.
+     */
+    Spliterator<P_IN> spliterator();
+
+    /**
      * Invoke a task in parallel using fork/join.
      *
      * @param task the fork/join task.
@@ -78,6 +92,4 @@
      * @return the fork/join result
      */
     <FJ_R> FJ_R invoke(ForkJoinTask<FJ_R> task);
-
-    Spliterator<P_IN> spliterator();
 }
--- a/src/share/classes/java/util/streams/PipelineHelper.java	Mon Oct 15 15:45:10 2012 -0700
+++ b/src/share/classes/java/util/streams/PipelineHelper.java	Mon Oct 15 15:45:20 2012 -0700
@@ -48,17 +48,23 @@
      * @return the result.
      */
     <R> R evaluateSequential(TerminalSink<P_OUT, R> sink) default {
-        into(sink);
+        wrapInto(sink);
         return sink.getAndClearState();
     }
 
     /**
-     * Create a sink chain and push elements from the source into that chain.
+     * Wrap a sink (see {@link #wrapSink(Sink)} that corresponds to the sink that
+     * accepts elements output from the pipeline, then push all elements obtained
+     * from the stream accessor into that wrapped sink.
      *
-     * @param sink the sink that is the last in the sink chain.
+     * @param sink the sink in which to wrap.
      */
-    void into(Sink<P_OUT, ?, ?> sink) default {
-        getStreamAccessor().into(wrapSink(sink));
+    void wrapInto(Sink<P_OUT, ?, ?> sink) default {
+        Sink<P_IN, ?, ?> wrappedSink = wrapSink(sink);
+        StreamAccessor<P_IN> source = getStreamAccessor();
+        wrappedSink.begin(source.estimateSize());
+        source.forEach(wrappedSink);
+        wrappedSink.end();
     }
 
     /**
--- a/src/share/classes/java/util/streams/StreamAccessor.java	Mon Oct 15 15:45:10 2012 -0700
+++ b/src/share/classes/java/util/streams/StreamAccessor.java	Mon Oct 15 15:45:20 2012 -0700
@@ -25,6 +25,7 @@
 package java.util.streams;
 
 import java.util.Iterator;
+import java.util.functions.Block;
 
 /**
  * StreamAccessor
@@ -34,15 +35,13 @@
  * @author Brian Goetz
  */
 public interface StreamAccessor<T> {
+
     /**
      * Provides any remaining elements into the provided sink.
-     * <p>The implementation of this method must, before accepting elements on the sink,
-     * invoke {@link Sink#begin(int)}, and after accepting all elements, invoke
-     * {@link Sink#end()}</p>
      *
      * @param sink The sink to which elements will be provided.
      */
-    void into(Sink<T, ?, ?> sink);
+    void forEach(Block<T> sink);
 
     /**
      * Return the iterator for the elements of this stream. The same iterator
--- a/src/share/classes/java/util/streams/Streams.java	Mon Oct 15 15:45:10 2012 -0700
+++ b/src/share/classes/java/util/streams/Streams.java	Mon Oct 15 15:45:20 2012 -0700
@@ -340,11 +340,9 @@
         }
 
         @Override
-        public void into(Sink<T, ?, ?> sink) {
-            sink.begin(getSizeIfKnown());
+        public void forEach(Block<T> sink) {
             while (it.hasNext())
-                sink.accept(it.next());
-            sink.end();
+                sink.apply(it.next());
         }
 
         @Override
@@ -391,7 +389,7 @@
         }
 
         @Override
-        public void into(Sink<T, ?, ?> sink) {
+        public void forEach(Block<T> sink) {
             // Implementing this method would result in a infinite loop
             // @@@ No mechanism for ops to short circuit push loop
             // @@@ Spliterator to the rescue? arity of 1, n elements for lhs, rhs is always infinite
@@ -441,10 +439,8 @@
         }
 
         @Override
-        public void into(Sink<T, ?, ?> sink) {
-            sink.begin(spliterator.getSizeIfKnown());
+        public void forEach(Block<T> sink) {
             spliterator.forEach(sink);
-            sink.end();
         }
 
         @Override
@@ -507,17 +503,15 @@
         }
 
         @Override
-        public void into(Sink<T, ?, ?> sink) {
-            sink.begin(getSizeIfKnown());
+        public void forEach(Block<T> sink) {
             if (iterator == null) {
                 traversable.forEach(sink);
                 iterator = Collections.emptyIterator();
             }
             else {
                 while (iterator.hasNext())
-                    sink.accept(iterator.next());
+                    sink.apply(iterator.next());
             }
-            sink.end();
         }
 
         @Override
@@ -572,20 +566,9 @@
         }
 
         @Override
-        public void into(final Sink<Mapping<K,V>, ?, ?> sink) {
-            Sink<Mapping<K, V>, K, V> castSink = (Sink<Mapping<K, V>, K, V>) sink;
-            if (iterator == null) {
-                traversable.forEach(castSink);
-                // @@@ Collections.emptyMapIterator ?
-                iterator = MapIterator.IteratorAdapter.adapt(Collections.<Mapping<K,V>>emptyIterator());
-            }
-            else {
-                while (iterator.hasNext()) {
-                    K k = iterator.nextKey();
-                    V v = iterator.curValue();
-                    castSink.accept(k, v);
-                }
-            }
+        public void forEach(Block<Mapping<K, V>> sink) {
+            // @@@ Need to transfer Block into BiBlock
+            forEach((BiBlock<? super K,? super V> ) sink);
         }
 
         @Override
@@ -841,14 +824,12 @@
         }
 
         @Override
-        public void into(Sink<T, ?, ?> sink) {
-            sink.begin(getSizeIfKnown());
+        public void forEach(Block<T> sink) {
             for (int i=offset; i<endOffset; i++) {
-                sink.accept(elements[i]);
+                sink.apply(elements[i]);
             }
             // update only once; reduce heap write traffic
             offset = endOffset;
-            sink.end();
         }
 
         @Override
@@ -879,13 +860,6 @@
         }
 
         @Override
-        public void into(Sink<T, ?, ?> sink) {
-            sink.begin(getSizeIfKnown());
-            forEach(sink);
-            sink.end();
-        }
-
-        @Override
         public int getStreamFlags() {
             return StreamOpFlags.IS_SIZED | StreamOpFlags.IS_ORDERED;
         }
@@ -918,14 +892,12 @@
         }
 
         @Override
-        public void into(Sink<T, ?, ?> sink) {
-            sink.begin(getSizeIfKnown());
+        public void forEach(Block<T> sink) {
             for (int i=offset; i<endOffset; i++) {
-                sink.accept(elements.get(i));
+                sink.apply(elements.get(i));
             }
             // update only once; reduce heap write traffic
             offset = endOffset;
-            sink.end();
         }
 
         @Override
@@ -961,13 +933,6 @@
         }
 
         @Override
-        public void into(Sink<T, ?, ?> sink) {
-            sink.begin(getSizeIfKnown());
-            forEach(sink);
-            sink.end();
-        }
-
-        @Override
         public int getStreamFlags() {
             return StreamOpFlags.IS_SIZED | StreamOpFlags.IS_ORDERED;
         }
--- a/src/share/classes/java/util/streams/ops/StatefulOp.java	Mon Oct 15 15:45:10 2012 -0700
+++ b/src/share/classes/java/util/streams/ops/StatefulOp.java	Mon Oct 15 15:45:20 2012 -0700
@@ -21,7 +21,7 @@
     @Override
     <P_IN> Node<E_OUT> evaluateSequential(PipelineHelper<P_IN, E_IN> helper) default {
         final NodeBuilder<E_OUT> nb = Nodes.makeBuilder();
-        helper.into(wrapSink(nb));
+        helper.wrapInto(wrapSink(nb));
         return nb;
     }
 }