changeset 6398:3b1593c9f320

Refactor AbstractTask hierarchy to create cancelable and short-circuitable tasks; refine FindAny,FindFirst,Match to use same; combine Limit and Skip operations into a combined Slice operation; initial work on parallelizing Slice
author briangoetz
date Wed, 14 Nov 2012 11:41:51 -0500
parents 218bdab80e30
children abcf56edb55c
files src/share/classes/java/util/streams/ReferencePipeline.java src/share/classes/java/util/streams/ops/AbstractTask.java src/share/classes/java/util/streams/ops/FindAnyOp.java src/share/classes/java/util/streams/ops/FindFirstOp.java src/share/classes/java/util/streams/ops/LimitOp.java src/share/classes/java/util/streams/ops/MatchOp.java src/share/classes/java/util/streams/ops/Nodes.java src/share/classes/java/util/streams/ops/OpUtils.java src/share/classes/java/util/streams/ops/SkipOp.java src/share/classes/java/util/streams/ops/SliceOp.java src/share/classes/java/util/streams/ops/TreeUtils.java test-ng/tests/org/openjdk/tests/java/util/streams/ops/LimitOpTest.java test-ng/tests/org/openjdk/tests/java/util/streams/ops/SkipOpTest.java
diffstat 13 files changed, 530 insertions(+), 340 deletions(-) [+]
line wrap: on
line diff
--- a/src/share/classes/java/util/streams/ReferencePipeline.java	Mon Nov 12 20:17:39 2012 -0500
+++ b/src/share/classes/java/util/streams/ReferencePipeline.java	Wed Nov 14 11:41:51 2012 -0500
@@ -91,12 +91,12 @@
 
     @Override
     public Stream<U> limit(int limit) {
-        return pipeline(new LimitOp<U>(limit));
+        return pipeline(new SliceOp<U>(0, limit));
     }
 
     @Override
     public Stream<U> skip(int n) {
-        return pipeline(new SkipOp<U>(n));
+        return pipeline(new SliceOp<U>(n));
     }
 
     @Override
--- a/src/share/classes/java/util/streams/ops/AbstractTask.java	Mon Nov 12 20:17:39 2012 -0500
+++ b/src/share/classes/java/util/streams/ops/AbstractTask.java	Wed Nov 14 11:41:51 2012 -0500
@@ -25,28 +25,48 @@
 package java.util.streams.ops;
 
 import java.util.concurrent.CountedCompleter;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.streams.ParallelPipelineHelper;
 import java.util.streams.Spliterator;
 
 /**
- * AbstractTask
+ * Abstract base class for most fork-join tasks used to implement stream ops.
+ * Manages splitting logic, tracking of child tasks, and intermediate results.
+ * While the <code>getRawResult</code> and <code>setRawResult</code> methods of
+ * <code>CountedCompleter</code> are initially unwired, this class uses them to
+ * manage per-task result storage.
+ *
+ * Splitting and setting up the child task links is done at <code>compute()</code> time
+ * for non-leaf nodes.  At <code>compute()</code> time for leaf nodes, it is guaranteed
+ * that the parent's child-related fields (including sibling links for the parent's children)
+ * will be set up for all children.
  *
  * @param <P_IN> Type of elements input to the pipeline
  * @param <P_OUT> Type of elements output from the pipeline
- * @param <R> Type of result
+ * @param <R> Type of intermediate result, may be different from operation result type
  * @param <T> Type of child and sibling tasks.
  *
  * @author Brian Goetz
  */
 abstract class AbstractTask<P_IN, P_OUT, R, T extends AbstractTask<P_IN, P_OUT, R, T>>
         extends CountedCompleter<R> {
+
+    /** The pipeline helper, common to all tasks in a computation */
     protected final ParallelPipelineHelper<P_IN, P_OUT> helper;
+
+    /** The spliterator for the portion of the input associated with the subtree rooted at this task */
     protected final Spliterator<P_IN> spliterator;
 
+    /** How many children does this task have? */
     protected int numChildren;
+
+    /** This task's first child.  Children are stored in a linked list, using the <code>nextSibling</code> field
+     * as the link to the next child. */
     protected T children;
+
+    /** Next sibling of this task */
     protected T nextSibling;
-    protected boolean isLeaf;
+
     private R rawResult;
 
     protected AbstractTask(ParallelPipelineHelper<P_IN, P_OUT> helper) {
@@ -61,32 +81,52 @@
         this.spliterator = spliterator;
     }
 
+    /** Construct a new node of type T whose parent is the receiver; must call
+     * the AbstractTask(T, Spliterator) constructor with the receiver and the provided Spliterator. */
     protected abstract T makeChild(Spliterator<P_IN> spliterator);
 
+    /** Compute the result associated with a leaf node */
     protected abstract R doLeaf();
 
+    /**
+     * Retrieve a result previously stored with <code>setRawResult</code>
+     */
     @Override
     public R getRawResult() {
         return rawResult;
     }
 
+    /**
+     * Associate the result with the task, can be retrieved with <code>getRawResult</code>
+     */
     @Override
     protected void setRawResult(R r) {
         rawResult = r;
     }
 
+    /** Is this task a leaf node?  (Only valid after <code>compute()</code> has been called on this node).
+     * If the node is not a leaf node, then children will be non-null and numChildren will be positive. */
+    protected boolean isLeaf() {
+        return (children == null);
+    }
+
+    /**
+     * Return the parent of this task, or null if this task is the root
+     */
     protected T getParent() {
         return (T) getCompleter();
     }
 
-    protected void completeRoot(R result) {
-        ((CountedCompleter<R>)getRoot()).complete(result);
-    }
-
+    /**
+     * Decide whether or not to split this task further or compute it directly.
+     * If computing directly, call <code>doLeaf</code> and pass the result to
+     * <code>setRawResult</code>.  If splitting, set up the child-related fields,
+     * create the child tasks, fork the rightmost child tasks, and compute the leftmost
+     * child task.
+     */
     @Override
     public void compute() {
-        isLeaf = !helper.suggestSplit(spliterator);
-        if (isLeaf) {
+        if (!helper.suggestSplit(spliterator)) {
             setRawResult(doLeaf());
             helpComplete();
         }
@@ -126,5 +166,110 @@
             }
         }
     }
+
+    protected boolean isLeftSpine() {
+        T node = (T) this;
+        while (node != null) {
+            T parent = node.getParent();
+            if (parent != null && parent.children != node)
+                return false;
+            node = parent;
+        }
+        return true;
+    }
 }
 
+abstract class AbstractCancelableTask<P_IN, P_OUT, R, T extends AbstractCancelableTask<P_IN, P_OUT, R, T>>
+        extends AbstractTask<P_IN, P_OUT, R, T> {
+    protected volatile boolean canceled;
+
+    protected AbstractCancelableTask(ParallelPipelineHelper<P_IN, P_OUT> helper) {
+        super(helper);
+    }
+
+    protected AbstractCancelableTask(T parent, Spliterator<P_IN> spliterator) {
+        super(parent, spliterator);
+    }
+
+    protected boolean taskCancelled() {
+        boolean cancel = canceled;
+        if (!cancel)
+            for (T parent = getParent(); !cancel && parent != null; parent = parent.getParent())
+                cancel = parent.canceled;
+        return cancel;
+    }
+
+    protected abstract R getEmptyResult();
+
+    @Override
+    public void compute() {
+        if (taskCancelled()) {
+            setRawResult(getEmptyResult());
+            helpComplete();
+        }
+        else
+            super.compute();
+    }
+
+    protected void cancel() {
+        canceled = true;
+    }
+
+    protected void cancelLaterNodes() {
+        // @@@ Avoid redundant cancels
+        T parent = getParent();
+        for (T sibling = this.nextSibling; sibling != null; sibling = sibling.nextSibling)
+            if (!sibling.canceled)
+                sibling.canceled = true;
+        // Go up the tree, cancel later siblings of all parents
+        if (parent != null)
+            parent.cancelLaterNodes();
+    }
+}
+
+abstract class AbstractShortCircuitTask<P_IN, P_OUT, R, T extends AbstractShortCircuitTask<P_IN, P_OUT, R, T>>
+        extends AbstractCancelableTask<P_IN, P_OUT, R, T> {
+    protected final AtomicReference<R> atomicAnswer;
+
+    protected AbstractShortCircuitTask(ParallelPipelineHelper<P_IN, P_OUT> helper) {
+        super(helper);
+        atomicAnswer = new AtomicReference<>(null);
+    }
+
+    protected AbstractShortCircuitTask(T parent, Spliterator<P_IN> spliterator) {
+        super(parent, spliterator);
+        atomicAnswer = parent.atomicAnswer;
+    }
+
+    protected void shortCircuit(R r) {
+        if (r != null)
+            atomicAnswer.compareAndSet(null, r);
+    }
+
+    @Override
+    protected void setRawResult(R r) {
+        if (getParent() == null && r != null)
+            shortCircuit(r);
+        else
+            super.setRawResult(r);
+    }
+
+    @Override
+    public R getRawResult() {
+        if (getParent() == null) {
+            R answer = atomicAnswer.get();
+            return (answer == null) ? getEmptyResult() : answer;
+        }
+        else
+            return super.getRawResult();
+    }
+
+    @Override
+    public void compute() {
+        // Have we already found an answer?
+        if (atomicAnswer.get() != null)
+            helpComplete();
+        else
+            super.compute();
+    }
+}
--- a/src/share/classes/java/util/streams/ops/FindAnyOp.java	Mon Nov 12 20:17:39 2012 -0500
+++ b/src/share/classes/java/util/streams/ops/FindAnyOp.java	Wed Nov 14 11:41:51 2012 -0500
@@ -70,17 +70,13 @@
         return iterator.hasNext() ? new Optional<>(iterator.next()) : Optional.<T>empty();
     }
 
-    private static class FindAnyTask<S, T> extends AbstractTask<S, T, Optional<T>, FindAnyTask<S, T>> {
-        private final AtomicReference<Optional<T>> answer;
-
+    private static class FindAnyTask<S, T> extends AbstractShortCircuitTask<S, T, Optional<T>, FindAnyTask<S, T>> {
         private FindAnyTask(ParallelPipelineHelper<S, T> helper) {
             super(helper);
-            this.answer = new AtomicReference<>(null);
         }
 
         private FindAnyTask(FindAnyTask<S, T> parent, Spliterator<S> spliterator) {
             super(parent, spliterator);
-            this.answer = parent.answer;
         }
 
         @Override
@@ -89,41 +85,16 @@
         }
 
         @Override
-        public void compute() {
-            // Have we already found an answer?
-            if (answer.get() != null)
-                helpComplete();
-            else
-                super.compute();
+        protected Optional<T> getEmptyResult() {
+            return Optional.empty();
         }
 
         @Override
         protected Optional<T> doLeaf() {
             Iterator<T> iterator = helper.wrapIterator(spliterator.iterator());
-            if (iterator.hasNext()) {
-                completeRoot(new Optional<>(iterator.next()));
-            }
+            if (iterator.hasNext())
+                shortCircuit(new Optional<>(iterator.next()));
             return null;
         }
-
-        @Override
-        public Optional<T> getRawResult() {
-            Optional<T> result = answer.get();
-            return result == null ? Optional.<T>empty() : result;
-        }
-
-        @Override
-        protected void setRawResult(Optional<T> result) {
-            if (result != null)
-                answer.compareAndSet(null, result);
-        }
-
-        // @@@ In CC.complete(r), should onCompletion be called before setRawResult(r) ?
-        // Below code is subtly flawed for this reason.  Currently work around this by mangling getRawResult.
-//        @Override
-//        public void onCompletion(CountedCompleter caller) {
-//            if (getParent() == null)
-//                setRawResult(Optional.<T>empty());
-//        }
     }
 }
--- a/src/share/classes/java/util/streams/ops/FindFirstOp.java	Mon Nov 12 20:17:39 2012 -0500
+++ b/src/share/classes/java/util/streams/ops/FindFirstOp.java	Wed Nov 14 11:41:51 2012 -0500
@@ -27,7 +27,6 @@
 import java.util.Iterator;
 import java.util.Optional;
 import java.util.concurrent.CountedCompleter;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.streams.ParallelPipelineHelper;
 import java.util.streams.PipelineHelper;
 import java.util.streams.Spliterator;
@@ -70,8 +69,7 @@
         return helper.invoke(new FindFirstTask<>(helper));
     }
 
-    private static class FindFirstTask<S, T> extends AbstractTask<S, T, Optional<T>, FindFirstTask<S, T>> {
-        private volatile boolean canceled = false;
+    private static class FindFirstTask<S, T> extends AbstractShortCircuitTask<S, T, Optional<T>, FindFirstTask<S, T>> {
 
         private FindFirstTask(ParallelPipelineHelper<S, T> helper) {
             super(helper);
@@ -86,60 +84,40 @@
             return new FindFirstTask<>(this, spliterator);
         }
 
-        private boolean taskCancelled() {
-            boolean cancel = canceled;
-            if (!cancel)
-                for (FindFirstTask<S, T> parent = getParent(); !cancel && parent != null; parent = parent.getParent())
-                    cancel = parent.canceled;
-            return cancel;
+        @Override
+        protected Optional<T> getEmptyResult() {
+            return Optional.empty();
         }
 
-        @Override
-        public void compute() {
-            if (taskCancelled()) {
-                setRawResult(Optional.<T>empty());
-                helpComplete();
-            }
+        private void foundResult(Optional<T> answer) {
+            if (isLeftSpine())
+                shortCircuit(answer);
             else
-                super.compute();
+                cancelLaterNodes();
         }
 
         @Override
         protected Optional<T> doLeaf() {
             Iterator<T> iterator = helper.wrapIterator(spliterator.iterator());
-            if (iterator.hasNext())
-                return new Optional<>(iterator.next());
+            if (iterator.hasNext()) {
+                Optional<T> answer = new Optional<>(iterator.next());
+                foundResult(answer);
+                return answer;
+            }
             else
-                return Optional.empty();
+                return null;
         }
 
         @Override
         public void onCompletion(CountedCompleter<?> caller) {
-            if (children == null) {
-                Optional<T> result = getRawResult();
-                if (result.isPresent())
-                    cancelLaterSiblings();
-            }
-            else {
-                for (FindFirstTask<S, T> child = children; child != null; child = child.nextSibling) {
-                    Optional<T> result = child.getRawResult();
+            for (FindFirstTask<S, T> child = children; child != null; child = child.nextSibling) {
+                Optional<T> result = child.getRawResult();
+                if (result != null && result.isPresent()) {
                     setRawResult(result);
-                    if (result.isPresent()) {
-                        cancelLaterSiblings();
-                        break;
-                    }
+                    foundResult(result);
+                    break;
                 }
             }
         }
-
-        private void cancelLaterSiblings() {
-            FindFirstTask<S, T> parent = getParent();
-            for (FindFirstTask<S, T> sibling = this.nextSibling; sibling != null; sibling = sibling.nextSibling)
-                sibling.canceled = true;
-            // If we are the leftmost child of the parent, then we should cancel the parent's later siblings too
-            // We could be more aggressve, and actually complete the parent here when the leftmost child completes
-            if (parent != null && parent.children == this)
-                parent.cancelLaterSiblings();
-        }
     }
 }
--- a/src/share/classes/java/util/streams/ops/LimitOp.java	Mon Nov 12 20:17:39 2012 -0500
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,119 +0,0 @@
-/*
- * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.  Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-package java.util.streams.ops;
-
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-import java.util.Objects;
-import java.util.streams.ParallelPipelineHelper;
-import java.util.streams.Sink;
-import java.util.streams.StreamOpFlags;
-
-
-public class LimitOp<T> implements StatefulOp<T, T> {
-
-    private final int limit;
-
-    public LimitOp(int limit) {
-        if (limit < 0)
-            throw new IllegalArgumentException("Limit must be non-negative: " + limit);
-
-        this.limit = limit;
-    }
-
-    @Override
-    public int getOpFlags() {
-        return StreamOpFlags.NOT_SIZED | StreamOpFlags.IS_SHORT_CIRCUIT;
-    }
-
-    @Override
-    public Sink<T> wrapSink(int flags, Sink sink) {
-        // @@@ Cannot short circuit the sink
-        // @@@ This smells somewhat
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Iterator<T> wrapIterator(int flags, Iterator<T> source) {
-        return iterator(source, limit);
-    }
-
-    @Override
-    public <S> Node<T> evaluateParallel(ParallelPipelineHelper<S, T> helper) {
-        // Dumb serial implementation defering to iterator
-        final Iterator<T> i = wrapIterator(helper.getStreamFlags(), helper.iterator());
-
-        final int size = Math.min(helper.getOutputSizeIfKnown(), limit);
-
-        final NodeBuilder<T> nb = Nodes.makeBuilder(size);
-
-        nb.begin(size);
-        while (i.hasNext())
-            nb.accept(i.next());
-        nb.end();
-
-        return nb.build();
-    }
-
-    public static <T> Iterator<T> iterator(Iterator<T> source, int limit) {
-        return (limit > 0)
-                ? new LimitingIterator<>(source, limit)
-                : Collections.emptyIterator();
-    }
-
-    static class LimitingIterator<T> implements Iterator<T> {
-
-        private final Iterator<T> source;
-
-        private int limit;
-
-        public LimitingIterator(Iterator<T> source, int limit) {
-            Objects.requireNonNull(source);
-            if (limit < 0)
-                throw new IllegalArgumentException("Limit must be non-negative: " + limit);
-
-            this.source = source;
-            this.limit = limit;
-        }
-
-        @Override
-        public boolean hasNext() {
-            if (limit > 0)
-                return source.hasNext();
-
-            return false;
-        }
-
-        @Override
-        public T next() {
-            if (!hasNext())
-                throw new NoSuchElementException("No Current Element");
-
-            limit--;
-            return source.next();
-        }
-    }
-}
--- a/src/share/classes/java/util/streams/ops/MatchOp.java	Mon Nov 12 20:17:39 2012 -0500
+++ b/src/share/classes/java/util/streams/ops/MatchOp.java	Wed Nov 14 11:41:51 2012 -0500
@@ -78,25 +78,20 @@
         // - if b == matchKind.shortCircuitOn, complete early and return b
         // - else if we complete normally, return !shortCircuitOn
 
-        MatchTask<S, T> task = new MatchTask<>(this, helper);
-        helper.invoke(task);
-        return task.answer.get();
+        return helper.invoke(new MatchTask<>(this, helper));
     }
 
-    private static class MatchTask<S, T> extends AbstractTask<S, T, Boolean, MatchTask<S, T>> {
+    private static class MatchTask<S, T> extends AbstractShortCircuitTask<S, T, Boolean, MatchTask<S, T>> {
         private final MatchOp<T> op;
-        private final AtomicReference<Boolean> answer;
 
         private MatchTask(MatchOp<T> op, ParallelPipelineHelper<S, T> helper) {
             super(helper);
             this.op = op;
-            this.answer = new AtomicReference<>(null);
         }
 
         private MatchTask(MatchTask<S, T> parent, Spliterator<S> spliterator) {
             super(parent, spliterator);
             this.op = parent.op;
-            this.answer = parent.answer;
         }
 
         @Override
@@ -105,28 +100,16 @@
         }
 
         @Override
-        public void compute() {
-            // Have we already found an answer?
-            if (answer.get() != null)
-                helpComplete();
-            else
-                super.compute();
+        protected Boolean doLeaf() {
+            boolean b = op.evaluate(helper.wrapIterator(spliterator.iterator()));
+            if (b == op.matchKind.shortCircuitResult)
+                shortCircuit(b);
+            return null;
         }
 
         @Override
-        protected Boolean doLeaf() {
-            boolean b = op.evaluate(helper.wrapIterator(spliterator.iterator()));
-            if (b == op.matchKind.shortCircuitResult) {
-                answer.compareAndSet(null, b);
-                completeRoot(b);
-            }
-            return b;
-        }
-
-        @Override
-        public void onCompletion(CountedCompleter caller) {
-            if (getParent() == null)
-                answer.compareAndSet(null, !op.matchKind.shortCircuitResult);
+        protected Boolean getEmptyResult() {
+            return !op.matchKind.shortCircuitResult;
         }
     }
 
--- a/src/share/classes/java/util/streams/ops/Nodes.java	Mon Nov 12 20:17:39 2012 -0500
+++ b/src/share/classes/java/util/streams/ops/Nodes.java	Wed Nov 14 11:41:51 2012 -0500
@@ -31,6 +31,46 @@
 
 public class Nodes {
 
+    private static final Node EMPTY_NODE = new EmptyNode();
+    private static final Object[] EMPTY_OBJECT_ARRAY = new Object[0];
+
+    public static<T> Node<T> emptyNode() {
+        return (Node<T>) EMPTY_NODE;
+    }
+
+    private static class EmptyNode<T> implements Node<T> {
+        @Override
+        public Spliterator<T> spliterator() {
+            return Streams. emptySpliterator();
+        }
+
+        @Override
+        public Node flatten() {
+            return this;
+        }
+
+        @Override
+        public T[] asArray() {
+            return (T[]) EMPTY_OBJECT_ARRAY;
+        }
+
+        @Override
+        public void copyInto(T[] array, int offset) throws IndexOutOfBoundsException { }
+
+        @Override
+        public Iterator<T> iterator() {
+            return Collections.emptyIterator();
+        }
+
+        @Override
+        public void forEach(Block<? super T> block) { }
+
+        @Override
+        public int size() {
+            return 0;
+        }
+    }
+
     public static<T> Node<T> node(final T[] array) {
         return new ArrayNode<>(array);
     }
--- a/src/share/classes/java/util/streams/ops/OpUtils.java	Mon Nov 12 20:17:39 2012 -0500
+++ b/src/share/classes/java/util/streams/ops/OpUtils.java	Wed Nov 14 11:41:51 2012 -0500
@@ -147,7 +147,7 @@
 
         @Override
         public void onCompletion(CountedCompleter caller) {
-            if (!isLeaf) {
+            if (!isLeaf()) {
                 ReduceTask<P_IN, P_OUT, R, S> child = children;
                 S result = child.getRawResult();
                 child = child.nextSibling;
--- a/src/share/classes/java/util/streams/ops/SkipOp.java	Mon Nov 12 20:17:39 2012 -0500
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,84 +0,0 @@
-/*
- * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.  Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-package java.util.streams.ops;
-
-import java.util.Iterator;
-import java.util.Objects;
-import java.util.streams.Sink;
-import java.util.streams.StreamOpFlags;
-
-public class SkipOp<T> implements StatefulOp<T, T> {
-
-    private final int skip;
-
-    public SkipOp(int skip) {
-        if (skip < 0)
-            throw new IllegalArgumentException("Skip must be non-negative: " + skip);
-
-        this.skip = skip;
-    }
-
-    @Override
-    public int getOpFlags() {
-        return StreamOpFlags.NOT_SIZED;
-    }
-
-    @Override
-    public Sink<T> wrapSink(int flags, final Sink sink) {
-        Objects.requireNonNull(sink);
-        return new Sink.ChainedValue<T>(sink) {
-            int n = skip;
-
-            @Override
-            public void accept(T t) {
-                if (n == 0) {
-                    downstream.accept(t);
-                }
-                else {
-                    n--;
-                }
-            }
-        };
-    }
-
-    @Override
-    public Iterator<T> wrapIterator(int flags, Iterator<T> source) {
-        return iterator(source, skip);
-    }
-
-    public static <T> Iterator<T> iterator(Iterator<T> source, int skip) {
-        if (skip < 0)
-            throw new IllegalArgumentException("Skip must be non-negative: " + skip);
-
-        // @@@ Should this be performed by a wrapping iterator on first call to hasNext/next?
-        // Skip elements in the source
-        while (skip > 0 && source.hasNext()) {
-            source.next();
-            skip--;
-        }
-
-        return source;
-    }
-}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/share/classes/java/util/streams/ops/SliceOp.java	Wed Nov 14 11:41:51 2012 -0500
@@ -0,0 +1,278 @@
+/*
+ * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package java.util.streams.ops;
+
+import java.util.*;
+import java.util.concurrent.CountedCompleter;
+import java.util.streams.ParallelPipelineHelper;
+import java.util.streams.Sink;
+import java.util.streams.Spliterator;
+import java.util.streams.StreamOpFlags;
+
+/**
+ * SliceOp
+ *
+ * @author Brian Goetz
+ */
+public class SliceOp<T> implements StatefulOp<T, T> {
+
+    private final int skip;
+    private final int limit;
+
+    public SliceOp(int skip, int limit) {
+        if (limit < 0)
+            throw new IllegalArgumentException("Limit must be non-negative: " + limit);
+        if (skip < 0)
+            throw new IllegalArgumentException("Skip must be non-negative: " + skip);
+        this.skip = skip;
+        this.limit = limit;
+    }
+
+    public SliceOp(int skip) {
+        if (skip < 0)
+            throw new IllegalArgumentException("Skip must be non-negative: " + skip);
+        this.skip = skip;
+        this.limit = -1;
+    }
+
+    @Override
+    public int getOpFlags() {
+        return StreamOpFlags.NOT_SIZED | ((limit != -1) ? StreamOpFlags.IS_SHORT_CIRCUIT : 0);
+    }
+
+    private int getFinalSize(ParallelPipelineHelper helper) {
+        int size = helper.getOutputSizeIfKnown();
+        if (size >= 0) {
+            size = Math.max(0, size - skip);
+            if (limit >= 0)
+                size = Math.min(size, limit);
+        }
+        return size;
+    }
+
+    @Override
+    public Sink<T> wrapSink(int flags, Sink sink) {
+        Objects.requireNonNull(sink);
+        if (limit == -1) {
+            return new Sink.ChainedValue<T>(sink) {
+                int n = skip;
+
+                @Override
+                public void accept(T t) {
+                    if (n == 0) {
+                        downstream.accept(t);
+                    }
+                    else {
+                        n--;
+                    }
+                }
+            };
+        }
+        else
+            // No push for short-circuit operations
+            throw new IllegalStateException();
+    }
+
+    @Override
+    public Iterator<T> wrapIterator(int flags, Iterator<T> source) {
+        return (limit == 0)
+               ? Collections.emptyIterator()
+               : new SliceIterator<>(source, skip, limit);
+    }
+
+    @Override
+    public <S> Node<T> evaluateParallel(ParallelPipelineHelper<S, T> helper) {
+        // Dumb serial implementation defering to iterator
+        final Iterator<T> i = wrapIterator(helper.getStreamFlags(), helper.iterator());
+        final int size = getFinalSize(helper);
+        final NodeBuilder<T> nb = Nodes.makeBuilder(size);
+
+        nb.begin(size);
+        while (i.hasNext())
+            nb.accept(i.next());
+        nb.end();
+
+        return nb.build();
+    }
+
+//    @Override
+//    public <S> Node<T> evaluateParallel(ParallelPipelineHelper<S, T> helper) {
+//        // Parallel strategy -- two cases
+//        // IF we have full size information
+//        // - decompose, keeping track of each leaf's (offset, size)
+//        // - calculate leaf only if intersection between (offset, size) and desired slice
+//        // - Construct a Node containing the appropriate sections of the appropriate leaves
+//        // IF we don't
+//        // - decompose, and calculate size of each leaf
+//        // - on complete, if we are a left-spine node, and we have the desired slice covered, cancel rest
+//        // - @@@ this can be significantly improved
+//
+//        Spliterator<S> spliterator = helper.spliterator();
+//        int size = spliterator.getSizeIfKnown();
+//        if (size >= 0 && helper.getOutputSizeIfKnown() == size && spliterator.isPredictableSplits())
+//            return helper.invoke(new SizedSliceTask<>(helper, skip, getFinalSize(helper)));
+//        else
+//            return helper.invoke(new SliceTask<>(helper));
+//    }
+
+    private static class SliceIterator<T> implements Iterator<T> {
+
+        private final Iterator<T> source;
+
+        private int toSkip;
+        private int toConsume;
+        private boolean noLimit;
+
+        public SliceIterator(Iterator<T> source, int skip, int limit) {
+            Objects.requireNonNull(source);
+            noLimit = (limit == -1);
+
+            this.source = source;
+            this.toSkip = skip;
+            this.toConsume = limit;
+        }
+
+        @Override
+        public boolean hasNext() {
+            while (toSkip > 0 && source.hasNext()) {
+                source.next();
+                toSkip--;
+            }
+            if (toSkip > 0)
+                return false;
+            else if (noLimit || toConsume > 0)
+                return source.hasNext();
+            else
+                return false;
+        }
+
+        @Override
+        public T next() {
+            if (!hasNext())
+                throw new NoSuchElementException("No Current Element");
+
+            toConsume--;
+            return source.next();
+        }
+    }
+
+    private static class SizedSliceTask<S, T> extends AbstractCancelableTask<S, T, Node<T>, SizedSliceTask<S, T>> {
+        private final int targetOffset, targetSize;
+        private final int offset, size;
+
+        private SizedSliceTask(ParallelPipelineHelper<S, T> helper, int offset, int size) {
+            super(helper);
+            targetOffset = offset;
+            targetSize = size;
+            this.offset = 0;
+            this.size = spliterator.getSizeIfKnown();
+        }
+
+        private SizedSliceTask(SizedSliceTask<S, T> parent, Spliterator<S> spliterator) {
+            // Makes assumptions about order in which siblings are created and linked into parent!
+            super(parent, spliterator);
+            targetOffset = parent.targetOffset;
+            targetSize = parent.targetSize;
+            int siblingSizes = 0;
+            for (SizedSliceTask<S, T> sibling = parent.children; sibling != null; sibling = sibling.nextSibling)
+                siblingSizes += sibling.size;
+            size = spliterator.getSizeIfKnown();
+            offset = parent.offset + siblingSizes;
+        }
+
+        @Override
+        protected SizedSliceTask<S, T> makeChild(Spliterator<S> spliterator) {
+            return new SizedSliceTask<>(this, spliterator);
+        }
+
+        @Override
+        protected Node<T> getEmptyResult() {
+            return Nodes.emptyNode();
+        }
+
+        @Override
+        public boolean taskCancelled() {
+            if (offset > targetOffset+targetSize || offset+size < targetOffset)
+                return true;
+            else
+                return super.taskCancelled();
+        }
+
+        @Override
+        protected Node<T> doLeaf() {
+            // @@@ If we're the first or last node, peel off the irrelevant elements manually
+            return helper.into(Nodes.<T>makeBuilder(spliterator.getSizeIfKnown())).build();
+        }
+
+        @Override
+        public void onCompletion(CountedCompleter<?> caller) {
+            if (!isLeaf()) {
+                Node<T> result = children.getRawResult();
+                for (SizedSliceTask<S, T> child = children.nextSibling; child != null; child = child.nextSibling)
+                    result = Nodes.node(result, child.getRawResult());
+                setRawResult(result);
+                // @@@ If we're done, complete the parent and cancel later siblings
+            }
+        }
+    }
+
+    private static class SliceTask<S, T> extends AbstractCancelableTask<S, T, Node<T>, SliceTask<S, T>> {
+        private final int targetOffset, targetSize;
+
+        private SliceTask(ParallelPipelineHelper<S, T> helper, int offset, int size) {
+            super(helper);
+            targetOffset = offset;
+            targetSize = size;
+        }
+
+        private SliceTask(SliceTask<S, T> parent, Spliterator<S> spliterator) {
+            super(parent, spliterator);
+            targetOffset = parent.targetOffset;
+            targetSize = parent.targetSize;
+        }
+
+        @Override
+        protected SliceTask<S, T> makeChild(Spliterator<S> spliterator) {
+            return new SliceTask<>(this, spliterator);
+        }
+
+        @Override
+        protected Node<T> getEmptyResult() {
+            return Nodes.emptyNode();
+        }
+
+        @Override
+        protected Node<T> doLeaf() {
+            return helper.into(Nodes.<T>makeBuilder(spliterator.getSizeIfKnown())).build();
+        }
+
+        @Override
+        public void onCompletion(CountedCompleter<?> caller) {
+            if (!isLeaf()) {
+                // @@@
+            }
+        }
+    }
+}
--- a/src/share/classes/java/util/streams/ops/TreeUtils.java	Mon Nov 12 20:17:39 2012 -0500
+++ b/src/share/classes/java/util/streams/ops/TreeUtils.java	Wed Nov 14 11:41:51 2012 -0500
@@ -115,7 +115,7 @@
 
         @Override
         public void onCompletion(CountedCompleter caller) {
-            if (!isLeaf) {
+            if (!isLeaf()) {
                 @SuppressWarnings("unchecked")
                 Node<U>[] nodes = (Node<U>[]) new Node[numChildren];
                 int idx = 0;
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/LimitOpTest.java	Mon Nov 12 20:17:39 2012 -0500
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/LimitOpTest.java	Wed Nov 14 11:41:51 2012 -0500
@@ -32,7 +32,7 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.streams.ops.LimitOp;
+import java.util.streams.ops.SliceOp;
 
 import static org.openjdk.tests.java.util.LambdaTestHelpers.*;
 
@@ -40,14 +40,12 @@
 @Test
 public class LimitOpTest extends StreamOpTestCase {
 
-    public void testRawIterator() {
-        assertCountSum(LimitOp.iterator(countTo(0).iterator(), 4), 0, 0);
-        assertCountSum(LimitOp.iterator(countTo(2).iterator(), 4), 2, 3);
-        assertCountSum(LimitOp.iterator(countTo(4).iterator(), 4), 4, 10);
-        assertCountSum(LimitOp.iterator(countTo(8).iterator(), 4), 4, 10);
-    }
+    public void testLimit() {
+        assertCountSum(countTo(0).stream().limit(4), 0, 0);
+        assertCountSum(countTo(2).stream().limit(4), 2, 3);
+        assertCountSum(countTo(4).stream().limit(4), 4, 10);
+        assertCountSum(countTo(8).stream().limit(4), 4, 10);
 
-    public void testLimit() {
         assertContents(Collections.<Integer>emptyList().stream().limit(0).iterator(),
                        Collections.<Integer>emptyList().iterator());
 
@@ -81,13 +79,13 @@
         List<Integer> limits = Collections.unmodifiableList(sizes(data));
 
         for (int l : limits) {
-            StreamResult<Integer> sr = exerciseOps(data, new LimitOp<>(l));
+            StreamResult<Integer> sr = exerciseOps(data, new SliceOp<>(0, l));
             assertTrue(sr.size() <= l,
                        String.format("size of stream result not within limit of %d", l));
         }
 
         for (int l : limits) {
-            StreamResult<Integer> sr = exerciseOps(data, new LimitOp<>(l), new LimitOp<>(l / 2));
+            StreamResult<Integer> sr = exerciseOps(data, new SliceOp<>(0, l), new SliceOp<>(0, l / 2));
             assertTrue(sr.size() <= l / 2,
                        String.format("size of stream result not within limit of %d", l / 2));
         }
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/SkipOpTest.java	Mon Nov 12 20:17:39 2012 -0500
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/SkipOpTest.java	Wed Nov 14 11:41:51 2012 -0500
@@ -30,8 +30,7 @@
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.streams.ops.LimitOp;
-import java.util.streams.ops.SkipOp;
+import java.util.streams.ops.SliceOp;
 
 import static org.openjdk.tests.java.util.LambdaTestHelpers.*;
 
@@ -39,15 +38,13 @@
 @Test
 public class SkipOpTest extends StreamOpTestCase {
 
-    public void testRawIterator() {
-        assertCountSum(SkipOp.iterator(countTo(0).iterator(), 0), 0, 0);
-        assertCountSum(SkipOp.iterator(countTo(0).iterator(), 4), 0, 0);
-        assertCountSum(SkipOp.iterator(countTo(4).iterator(), 4), 0, 0);
-        assertCountSum(SkipOp.iterator(countTo(4).iterator(), 2), 2, 7);
-        assertCountSum(SkipOp.iterator(countTo(4).iterator(), 0), 4, 10);
-    }
+    public void testSkip() {
+        assertCountSum(countTo(0).stream().skip(0), 0, 0);
+        assertCountSum(countTo(0).stream().skip(4), 0, 0);
+        assertCountSum(countTo(4).stream().skip(4), 0, 0);
+        assertCountSum(countTo(4).stream().skip(2), 2, 7);
+        assertCountSum(countTo(4).stream().skip(0), 4, 10);
 
-    public void testSkip() {
         assertContents(Collections.<Integer>emptyList().stream().skip(0).iterator(),
                        Collections.<Integer>emptyList().iterator());
 
@@ -72,13 +69,13 @@
         List<Integer> skips = Collections.unmodifiableList(sizes(data));
 
         for (int s : skips) {
-            StreamResult<Integer> sr = exerciseOps(data, new SkipOp<>(s));
+            StreamResult<Integer> sr = exerciseOps(data, new SliceOp<>(s));
             assertTrue(data.size() - sr.size() <= s,
                        String.format("size of stream result not within skip limit of %d", s));
         }
 
         for (int s : skips) {
-            StreamResult<Integer> sr = exerciseOps(data, new SkipOp<>(s), new SkipOp<>(s / 2));
+            StreamResult<Integer> sr = exerciseOps(data, new SliceOp<>(s), new SliceOp<>(s / 2));
             assertTrue(data.size() - sr.size() <= s + s / 2,
                        String.format("size of stream result not within skip limit of %d", s + s / 2));
         }
@@ -89,7 +86,10 @@
         List<Integer> skips = Collections.unmodifiableList(sizes(data));
 
         for (int s : skips) {
-            StreamResult<Integer> sr = exerciseOps(data, new SkipOp<>(s), new LimitOp<>(10));
+            StreamResult<Integer> sr = exerciseOps(data, new SliceOp<>(s), new SliceOp<>(0, 10));
+            assertTrue(sr.size() <= 10,
+                       String.format("size of stream result not within limit of 10"));
+            sr = exerciseOps(data, new SliceOp<>(s, 10));
             assertTrue(sr.size() <= 10,
                        String.format("size of stream result not within limit of 10"));
         }