changeset 8457:8be9094c8fd8

Optimization for slice when pipeline is SIZED and source spliterator is SIZED and SUBSIZED. This also avoids OOME errors in certain cases when limit is used to limit streams of very large counts of elements.
author psandoz
date Thu, 02 May 2013 18:57:35 +0200
parents aba60ef63ee2
children ee6e2fffa75c
files src/share/classes/java/util/stream/AbstractPipeline.java src/share/classes/java/util/stream/PipelineHelper.java src/share/classes/java/util/stream/SliceOps.java src/share/classes/java/util/stream/StreamSpliterators.java test-ng/boottests/java/util/stream/SliceSpliteratorTest.java test-ng/tests/org/openjdk/tests/java/util/stream/InfiniteStreamWithLimitOpTest.java
diffstat 6 files changed, 319 insertions(+), 2 deletions(-) [+]
line wrap: on
line diff
--- a/src/share/classes/java/util/stream/AbstractPipeline.java	Thu May 02 07:51:41 2013 -0700
+++ b/src/share/classes/java/util/stream/AbstractPipeline.java	Thu May 02 18:57:35 2013 +0200
@@ -503,6 +503,16 @@
     }
 
     @Override
+    final <P_IN> Spliterator<E_OUT> wrapSpliterator(Spliterator<P_IN> sourceSpliterator) {
+        if (depth == 0) {
+            return (Spliterator<E_OUT>) sourceSpliterator;
+        }
+        else {
+            return wrap(this, () -> sourceSpliterator, isParallel());
+        }
+    }
+
+    @Override
     @SuppressWarnings("unchecked")
     final <P_IN> Node<E_OUT> evaluate(Spliterator<P_IN> spliterator,
                                       boolean flatten,
--- a/src/share/classes/java/util/stream/PipelineHelper.java	Thu May 02 07:51:41 2013 -0700
+++ b/src/share/classes/java/util/stream/PipelineHelper.java	Thu May 02 18:57:35 2013 +0200
@@ -146,6 +146,14 @@
     abstract<P_IN> Sink<P_IN> wrapSink(Sink<P_OUT> sink);
 
     /**
+     *
+     * @param spliterator
+     * @param <P_IN>
+     * @return
+     */
+    abstract<P_IN> Spliterator<P_OUT> wrapSpliterator(Spliterator<P_IN> spliterator);
+
+    /**
      * Constructs a @{link Node.Builder} compatible with the output shape of
      * this {@code PipelineHelper}.
      *
--- a/src/share/classes/java/util/stream/SliceOps.java	Thu May 02 07:51:41 2013 -0700
+++ b/src/share/classes/java/util/stream/SliceOps.java	Thu May 02 18:57:35 2013 +0200
@@ -42,6 +42,20 @@
     private SliceOps() { }
 
     /**
+     * Calculate the sliced size given the current size, number of elements
+     * skip, and the number of elements to limit.
+     *
+     * @param size the current size
+     * @param skip the number of elements to skip, assumed to be >= 0
+     * @param limit the number of elements to limit, assumed to be >= 0, with
+     *        a value of {@code Long.MAX_VALUE} if there is no limit
+     * @return the sliced size
+     */
+    private static long calcSize(long size, long skip, long limit) {
+        return size >= 0 ? Math.max(-1, Math.min(size - skip, limit)) : -1;
+    }
+
+    /**
      * Appends a "slice" operation to the provided stream.  The slice operation
      * may be may be skip-only, limit-only, or skip-and-limit.
      *
@@ -58,11 +72,44 @@
 
         return new ReferencePipeline.StatefulOp<T,T>(upstream, StreamShape.REFERENCE,
                                                      flags(limit)) {
+
+            private <S> Spliterator<S> sliceSpliterator(Spliterator<S> s) {
+                long sliceFence = limit >= 0 ? skip + limit : Long.MAX_VALUE;
+                // Check for overflow
+                if (sliceFence < 0)
+                    sliceFence = Long.MAX_VALUE;
+                return new StreamSpliterators.SliceSpliterator<>(
+                        s,
+                        skip,
+                        sliceFence);
+            }
+
+            @Override
+            <P_IN> Spliterator<T> opEvaluateParallelLazy(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {
+                if (helper.exactOutputSizeIfKnown(spliterator) > 0 &&
+                    spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
+                    return sliceSpliterator(helper.wrapSpliterator(spliterator));
+                }
+                else {
+                    return new SliceTask<>(this, helper, spliterator, i -> (T[]) new Object[i], skip, limit).
+                            invoke().spliterator();
+                }
+            }
+
             @Override
             <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,
                                               Spliterator<P_IN> spliterator,
                                               IntFunction<T[]> generator) {
-                return new SliceTask<>(this, helper, spliterator, generator, skip, limit).invoke();
+
+                if (helper.exactOutputSizeIfKnown(spliterator) > 0 &&
+                    spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
+                    // @@@ method on AbstractPipeline and not PipelineHelper
+                    return evaluateToNode(helper, sliceSpliterator(spliterator), true, generator);
+                }
+                else {
+                    return new SliceTask<>(this, helper, spliterator, generator, skip, limit).
+                            invoke();
+                }
             }
 
             @Override
@@ -72,6 +119,11 @@
                     long m = limit >= 0 ? limit : Long.MAX_VALUE;
 
                     @Override
+                    public void begin(long size) {
+                        downstream.begin(calcSize(size, skip, m));
+                    }
+
+                    @Override
                     public void accept(T t) {
                         if (n == 0) {
                             if (m > 0) {
@@ -123,6 +175,11 @@
                     long m = limit >= 0 ? limit : Long.MAX_VALUE;
 
                     @Override
+                    public void begin(long size) {
+                        downstream.begin(calcSize(size, skip, m));
+                    }
+
+                    @Override
                     public void accept(int t) {
                         if (n == 0) {
                             if (m > 0) {
@@ -174,6 +231,11 @@
                     long m = limit >= 0 ? limit : Long.MAX_VALUE;
 
                     @Override
+                    public void begin(long size) {
+                        downstream.begin(calcSize(size, skip, m));
+                    }
+
+                    @Override
                     public void accept(long t) {
                         if (n == 0) {
                             if (m > 0) {
@@ -225,6 +287,11 @@
                     long m = limit >= 0 ? limit : Long.MAX_VALUE;
 
                     @Override
+                    public void begin(long size) {
+                        downstream.begin(calcSize(size, skip, m));
+                    }
+
+                    @Override
                     public void accept(double t) {
                         if (n == 0) {
                             if (m > 0) {
--- a/src/share/classes/java/util/stream/StreamSpliterators.java	Thu May 02 07:51:41 2013 -0700
+++ b/src/share/classes/java/util/stream/StreamSpliterators.java	Thu May 02 18:57:35 2013 +0200
@@ -26,6 +26,7 @@
 
 import java.util.Comparator;
 import java.util.Spliterator;
+import java.util.Spliterators;
 import java.util.function.BooleanSupplier;
 import java.util.function.Consumer;
 import java.util.function.DoubleConsumer;
@@ -240,7 +241,7 @@
             // but for sub-splits only an estimate is known
             if ((c & Spliterator.SIZED) != 0) {
                 c &= ~(Spliterator.SIZED | Spliterator.SUBSIZED);
-                c |= (spliterator.characteristics() & Spliterator.SIZED & Spliterator.SUBSIZED);
+                c |= (spliterator.characteristics() & (Spliterator.SIZED | Spliterator.SUBSIZED));
             }
 
             return c;
@@ -633,4 +634,129 @@
             }
         }
     }
+
+    static final class SliceSpliterator<T> implements Spliterator<T> {
+        // The start index of the slice
+        final long sliceOrigin;
+        // One past the last index of the slice
+        final long sliceFence;
+
+        // The spliterator to slice
+        Spliterator<T> s;
+        // current (absolute) index, modified on advance/split
+        long index;
+        // one past last (absolute) index or sliceFence, which ever is smaller
+        long fence;
+
+        SliceSpliterator(Spliterator<T> s, long sliceOrigin, long sliceFence) {
+            this(s, sliceOrigin, sliceFence, 0, Math.min(s.estimateSize(), sliceFence));
+        }
+
+        private SliceSpliterator(Spliterator<T> s, long sliceOrigin, long sliceFence, long origin, long fence) {
+            assert s.hasCharacteristics(Spliterator.SUBSIZED);
+            this.s = s;
+            this.sliceOrigin = sliceOrigin;
+            this.sliceFence = sliceFence;
+            this.index = origin;
+            this.fence = fence;
+        }
+
+        @Override
+        public boolean tryAdvance(Consumer<? super T> action) {
+            if (sliceOrigin >= fence)
+                return false;
+
+            if (index >= fence)
+                return false;
+
+            while (sliceOrigin > index) {
+                s.tryAdvance(e -> {});
+                index++;
+            }
+
+            index++;
+            return s.tryAdvance(action);
+        }
+
+        @Override
+        public void forEachRemaining(Consumer<? super T> action) {
+            if (sliceOrigin >= fence)
+                return;
+
+            if (index >= fence)
+                return;
+
+            if (index >= sliceOrigin && (index + s.estimateSize()) <= sliceFence) {
+                // The spliterator is contained within the slice
+                s.forEachRemaining(action);
+                index = fence;
+            } else {
+                // The spliterator intersects with the slice
+                while (sliceOrigin > index) {
+                    s.tryAdvance(e -> {});
+                    index++;
+                }
+                // Traverse elements up to the fence
+                for (;index < fence; index++) {
+                    s.tryAdvance(action);
+                }
+            }
+        }
+
+        @Override
+        public Spliterator<T> trySplit() {
+            if (sliceOrigin >= fence)
+                return null;
+
+            if (index >= fence)
+                return null;
+
+            // Keep splitting until the left and right splits intersect with the slice
+            // thereby ensuring the size estimate decreases.
+            // This also avoids creating empty spliterators which can result in
+            // existing and additionally created F/J tasks that perform
+            // redundant work on no elements.
+            while (true) {
+                Spliterator<T> leftSplit = s.trySplit();
+                if (leftSplit == null)
+                    return null;
+
+                long leftSplitFenceUnbounded = index + leftSplit.estimateSize();
+                long leftSplitFence = Math.min(leftSplitFenceUnbounded, sliceFence);
+                if (sliceOrigin >= leftSplitFence) {
+                    // The left split does not intersect with, and is to the left of, the slice
+                    // The right split does intersect
+                    // Discard the left split and split further with the right split
+                    index = leftSplitFence;
+                }
+                else if (leftSplitFence >= sliceFence) {
+                    // The right split does not intersect with, and is to the right of, the slice
+                    // The left split does intersect
+                    // Discard the right split and split further with the left split
+                    s = leftSplit;
+                    fence = leftSplitFence;
+                }
+                else if (index >= sliceOrigin && leftSplitFenceUnbounded <= sliceFence) {
+                    // The left split is contained within the slice, return the underlying left split
+                    // Right split is contained within or intersects with the slice
+                    index = leftSplitFence;
+                    return leftSplit;
+                } else {
+                    // The left split intersects with the slice
+                    // Right split is contained within or intersects with the slice
+                    return new SliceSpliterator<>(leftSplit, sliceOrigin, sliceFence, index, index = leftSplitFence);
+                }
+            }
+        }
+
+        @Override
+        public long estimateSize() {
+            return fence - Math.max(sliceOrigin, index);
+        }
+
+        @Override
+        public int characteristics() {
+            return s.characteristics();
+        }
+    }
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test-ng/boottests/java/util/stream/SliceSpliteratorTest.java	Thu May 02 18:57:35 2013 +0200
@@ -0,0 +1,78 @@
+/*
+ * Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package java.util.stream;
+
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Spliterator;
+
+import static org.testng.Assert.assertEquals;
+
+@Test
+public class SliceSpliteratorTest {
+    public void testOneSplit() {
+        for (int i = 0; i < 100; i++) {
+            for (int j = i; j < 100; j++) {
+                Spliterator<Integer> s = Arrays.spliterator(IntStream.range(0, 100).
+                        boxed().toArray(Integer[]::new));
+
+                StreamSpliterators.SliceSpliterator<Integer> sliceS =
+                        new StreamSpliterators.SliceSpliterator<>(s, i, j);
+
+                List<Integer> l = new ArrayList<>();
+                for (int k = i; k < j; k++) {
+                    l.add(k);
+                }
+
+                List<Integer> sl = new ArrayList<>();
+
+                Spliterator<Integer> split = sliceS.trySplit();
+                if (split != null)
+                    split.forEachRemaining(sl::add);
+                sliceS.forEachRemaining(sl::add);
+
+                assertEquals(sl, l);
+            }
+        }
+    }
+
+    public void testSpliterator() {
+        for (int i = 0; i < 100; i++) {
+            for (int j = i; j < 100; j++) {
+                final int origin = i;
+                final int fence = j;
+                SpliteratorTestHelper.testSpliterator(() -> {
+                    Spliterator<Integer> s = Arrays.spliterator(IntStream.range(0, 100).
+                            boxed().toArray(Integer[]::new));
+
+                    return new StreamSpliterators.SliceSpliterator<>(s, origin, fence);
+                });
+            }
+        }
+    }
+}
--- a/test-ng/tests/org/openjdk/tests/java/util/stream/InfiniteStreamWithLimitOpTest.java	Thu May 02 07:51:41 2013 -0700
+++ b/test-ng/tests/org/openjdk/tests/java/util/stream/InfiniteStreamWithLimitOpTest.java	Thu May 02 18:57:35 2013 +0200
@@ -24,6 +24,7 @@
  */
 package org.openjdk.tests.java.util.stream;
 
+import java.util.stream.IntStream;
 import java.util.stream.OpTestCase;
 import org.testng.annotations.Test;
 
@@ -32,7 +33,9 @@
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static java.util.stream.Collectors.toList;
 import static java.util.stream.LambdaTestHelpers.assertContents;
+import static java.util.stream.LambdaTestHelpers.toBoxedList;
 
 
 @Test
@@ -40,6 +43,31 @@
 
     private static final List<String> tenAs = Arrays.asList("A", "A", "A", "A", "A", "A", "A", "A", "A", "A");
 
+    public void testParallelSizedSubSizedLimit() {
+        {
+            String[] a = Stream.generate(() -> "A").parallel().limit(1 << 20).toArray(String[]::new);
+            assertEquals(a.length, 1 << 20);
+
+            List<String> l = Stream.generate(() -> "A").parallel().limit(1 << 20).collect(toList());
+            assertEquals(l.size(), 1 << 20);
+        }
+
+        {
+            Integer[] a = IntStream.range(0, Integer.MAX_VALUE).parallel().boxed().limit(1 << 20).toArray(Integer[]::new);
+            Integer[] sa = IntStream.range(0, Integer.MAX_VALUE).boxed().limit(1 << 20).toArray(Integer[]::new);
+            assertEquals(a, sa);
+
+            List<Integer> l = IntStream.range(0, Integer.MAX_VALUE).parallel().boxed().limit(1 << 20).collect(toList());
+            List<Integer> sl = IntStream.range(0, Integer.MAX_VALUE).boxed().limit(1 << 20).collect(toList());
+            assertEquals(l, sl);
+        }
+
+        {
+            Stream.generate(() -> "A").parallel().limit(1 << 20).iterator().next();
+            IntStream.range(0, Integer.MAX_VALUE).parallel().boxed().limit(1 << 20).iterator().next();
+        }
+    }
+
     public void testRepeatLimit() {
         assertContents(Stream.generate(() -> "A").limit(10).iterator(), tenAs.iterator());
     }