changeset 13561:9772eb3c8097

8075939: Stream.flatMap() causes breaking of short-circuiting of terminal operations Reviewed-by: forax, smarks, andrew
author shade
date Wed, 05 Jun 2019 03:05:33 +0100
parents e177f8049f2f
children ef03b9bc2874
files src/share/classes/java/util/stream/DoublePipeline.java src/share/classes/java/util/stream/IntPipeline.java src/share/classes/java/util/stream/LongPipeline.java src/share/classes/java/util/stream/ReferencePipeline.java src/share/classes/java/util/stream/SortedOps.java test/java/util/stream/test/org/openjdk/tests/java/util/stream/FlatMapOpTest.java
diffstat 6 files changed, 263 insertions(+), 50 deletions(-) [+]
line wrap: on
line diff
--- a/src/share/classes/java/util/stream/DoublePipeline.java	Thu Dec 13 00:46:39 2018 +0900
+++ b/src/share/classes/java/util/stream/DoublePipeline.java	Wed Jun 05 03:05:33 2019 +0100
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2013, 2014, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2013, 2017, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -260,6 +260,12 @@
             @Override
             Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
                 return new Sink.ChainedDouble<Double>(sink) {
+                    // true if cancellationRequested() has been called
+                    boolean cancellationRequestedCalled;
+
+                    // cache the consumer to avoid creation on every accepted element
+                    DoubleConsumer downstreamAsDouble = downstream::accept;
+
                     @Override
                     public void begin(long size) {
                         downstream.begin(-1);
@@ -268,11 +274,27 @@
                     @Override
                     public void accept(double t) {
                         try (DoubleStream result = mapper.apply(t)) {
-                            // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
-                            if (result != null)
-                                result.sequential().forEach(i -> downstream.accept(i));
+                            if (result != null) {
+                                if (!cancellationRequestedCalled) {
+                                    result.sequential().forEach(downstreamAsDouble);
+                                }
+                                else {
+                                    Spliterator.OfDouble s = result.sequential().spliterator();
+                                    do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsDouble));
+                                }
+                            }
                         }
                     }
+
+                    @Override
+                    public boolean cancellationRequested() {
+                        // If this method is called then an operation within the stream
+                        // pipeline is short-circuiting (see AbstractPipeline.copyInto).
+                        // Note that we cannot differentiate between an upstream or
+                        // downstream operation
+                        cancellationRequestedCalled = true;
+                        return downstream.cancellationRequested();
+                    }
                 };
             }
         };
--- a/src/share/classes/java/util/stream/IntPipeline.java	Thu Dec 13 00:46:39 2018 +0900
+++ b/src/share/classes/java/util/stream/IntPipeline.java	Wed Jun 05 03:05:33 2019 +0100
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2012, 2014, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2012, 2017, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -296,6 +296,12 @@
             @Override
             Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
                 return new Sink.ChainedInt<Integer>(sink) {
+                    // true if cancellationRequested() has been called
+                    boolean cancellationRequestedCalled;
+
+                    // cache the consumer to avoid creation on every accepted element
+                    IntConsumer downstreamAsInt = downstream::accept;
+
                     @Override
                     public void begin(long size) {
                         downstream.begin(-1);
@@ -304,11 +310,27 @@
                     @Override
                     public void accept(int t) {
                         try (IntStream result = mapper.apply(t)) {
-                            // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
-                            if (result != null)
-                                result.sequential().forEach(i -> downstream.accept(i));
+                            if (result != null) {
+                                if (!cancellationRequestedCalled) {
+                                    result.sequential().forEach(downstreamAsInt);
+                                }
+                                else {
+                                    Spliterator.OfInt s = result.sequential().spliterator();
+                                    do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsInt));
+                                }
+                            }
                         }
                     }
+
+                    @Override
+                    public boolean cancellationRequested() {
+                        // If this method is called then an operation within the stream
+                        // pipeline is short-circuiting (see AbstractPipeline.copyInto).
+                        // Note that we cannot differentiate between an upstream or
+                        // downstream operation
+                        cancellationRequestedCalled = true;
+                        return downstream.cancellationRequested();
+                    }
                 };
             }
         };
--- a/src/share/classes/java/util/stream/LongPipeline.java	Thu Dec 13 00:46:39 2018 +0900
+++ b/src/share/classes/java/util/stream/LongPipeline.java	Wed Jun 05 03:05:33 2019 +0100
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2013, 2014, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2013, 2017, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -277,6 +277,12 @@
             @Override
             Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
                 return new Sink.ChainedLong<Long>(sink) {
+                    // true if cancellationRequested() has been called
+                    boolean cancellationRequestedCalled;
+
+                    // cache the consumer to avoid creation on every accepted element
+                    LongConsumer downstreamAsLong = downstream::accept;
+
                     @Override
                     public void begin(long size) {
                         downstream.begin(-1);
@@ -285,11 +291,27 @@
                     @Override
                     public void accept(long t) {
                         try (LongStream result = mapper.apply(t)) {
-                            // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
-                            if (result != null)
-                                result.sequential().forEach(i -> downstream.accept(i));
+                            if (result != null) {
+                                if (!cancellationRequestedCalled) {
+                                    result.sequential().forEach(downstreamAsLong);
+                                }
+                                else {
+                                    Spliterator.OfLong s = result.sequential().spliterator();
+                                    do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsLong));
+                                }
+                            }
                         }
                     }
+
+                    @Override
+                    public boolean cancellationRequested() {
+                        // If this method is called then an operation within the stream
+                        // pipeline is short-circuiting (see AbstractPipeline.copyInto).
+                        // Note that we cannot differentiate between an upstream or
+                        // downstream operation
+                        cancellationRequestedCalled = true;
+                        return downstream.cancellationRequested();
+                    }
                 };
             }
         };
--- a/src/share/classes/java/util/stream/ReferencePipeline.java	Thu Dec 13 00:46:39 2018 +0900
+++ b/src/share/classes/java/util/stream/ReferencePipeline.java	Wed Jun 05 03:05:33 2019 +0100
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2012, 2017, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -251,12 +251,14 @@
     @Override
     public final <R> Stream<R> flatMap(Function<? super P_OUT, ? extends Stream<? extends R>> mapper) {
         Objects.requireNonNull(mapper);
-        // We can do better than this, by polling cancellationRequested when stream is infinite
         return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
             @Override
             Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
                 return new Sink.ChainedReference<P_OUT, R>(sink) {
+                    // true if cancellationRequested() has been called
+                    boolean cancellationRequestedCalled;
+
                     @Override
                     public void begin(long size) {
                         downstream.begin(-1);
@@ -265,11 +267,27 @@
                     @Override
                     public void accept(P_OUT u) {
                         try (Stream<? extends R> result = mapper.apply(u)) {
-                            // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
-                            if (result != null)
-                                result.sequential().forEach(downstream);
+                            if (result != null) {
+                                if (!cancellationRequestedCalled) {
+                                    result.sequential().forEach(downstream);
+                                }
+                                else {
+                                    Spliterator<? extends R> s = result.sequential().spliterator();
+                                    do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstream));
+                                }
+                            }
                         }
                     }
+
+                    @Override
+                    public boolean cancellationRequested() {
+                        // If this method is called then an operation within the stream
+                        // pipeline is short-circuiting (see AbstractPipeline.copyInto).
+                        // Note that we cannot differentiate between an upstream or
+                        // downstream operation
+                        cancellationRequestedCalled = true;
+                        return downstream.cancellationRequested();
+                    }
                 };
             }
         };
@@ -278,13 +296,17 @@
     @Override
     public final IntStream flatMapToInt(Function<? super P_OUT, ? extends IntStream> mapper) {
         Objects.requireNonNull(mapper);
-        // We can do better than this, by polling cancellationRequested when stream is infinite
         return new IntPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
                                               StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
             @Override
             Sink<P_OUT> opWrapSink(int flags, Sink<Integer> sink) {
                 return new Sink.ChainedReference<P_OUT, Integer>(sink) {
+                    // true if cancellationRequested() has been called
+                    boolean cancellationRequestedCalled;
+
+                    // cache the consumer to avoid creation on every accepted element
                     IntConsumer downstreamAsInt = downstream::accept;
+
                     @Override
                     public void begin(long size) {
                         downstream.begin(-1);
@@ -293,11 +315,23 @@
                     @Override
                     public void accept(P_OUT u) {
                         try (IntStream result = mapper.apply(u)) {
-                            // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
-                            if (result != null)
-                                result.sequential().forEach(downstreamAsInt);
+                            if (result != null) {
+                                if (!cancellationRequestedCalled) {
+                                    result.sequential().forEach(downstreamAsInt);
+                                }
+                                else {
+                                    Spliterator.OfInt s = result.sequential().spliterator();
+                                    do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsInt));
+                                }
+                            }
                         }
                     }
+
+                    @Override
+                    public boolean cancellationRequested() {
+                        cancellationRequestedCalled = true;
+                        return downstream.cancellationRequested();
+                    }
                 };
             }
         };
@@ -306,13 +340,17 @@
     @Override
     public final DoubleStream flatMapToDouble(Function<? super P_OUT, ? extends DoubleStream> mapper) {
         Objects.requireNonNull(mapper);
-        // We can do better than this, by polling cancellationRequested when stream is infinite
         return new DoublePipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
                                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
             @Override
             Sink<P_OUT> opWrapSink(int flags, Sink<Double> sink) {
                 return new Sink.ChainedReference<P_OUT, Double>(sink) {
+                    // true if cancellationRequested() has been called
+                    boolean cancellationRequestedCalled;
+
+                    // cache the consumer to avoid creation on every accepted element
                     DoubleConsumer downstreamAsDouble = downstream::accept;
+
                     @Override
                     public void begin(long size) {
                         downstream.begin(-1);
@@ -321,11 +359,23 @@
                     @Override
                     public void accept(P_OUT u) {
                         try (DoubleStream result = mapper.apply(u)) {
-                            // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
-                            if (result != null)
-                                result.sequential().forEach(downstreamAsDouble);
+                            if (result != null) {
+                                if (!cancellationRequestedCalled) {
+                                    result.sequential().forEach(downstreamAsDouble);
+                                }
+                                else {
+                                    Spliterator.OfDouble s = result.sequential().spliterator();
+                                    do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsDouble));
+                                }
+                            }
                         }
                     }
+
+                    @Override
+                    public boolean cancellationRequested() {
+                        cancellationRequestedCalled = true;
+                        return downstream.cancellationRequested();
+                    }
                 };
             }
         };
@@ -340,7 +390,12 @@
             @Override
             Sink<P_OUT> opWrapSink(int flags, Sink<Long> sink) {
                 return new Sink.ChainedReference<P_OUT, Long>(sink) {
+                    // true if cancellationRequested() has been called
+                    boolean cancellationRequestedCalled;
+
+                    // cache the consumer to avoid creation on every accepted element
                     LongConsumer downstreamAsLong = downstream::accept;
+
                     @Override
                     public void begin(long size) {
                         downstream.begin(-1);
@@ -349,11 +404,23 @@
                     @Override
                     public void accept(P_OUT u) {
                         try (LongStream result = mapper.apply(u)) {
-                            // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
-                            if (result != null)
-                                result.sequential().forEach(downstreamAsLong);
+                            if (result != null) {
+                                if (!cancellationRequestedCalled) {
+                                    result.sequential().forEach(downstreamAsLong);
+                                }
+                                else {
+                                    Spliterator.OfLong s = result.sequential().spliterator();
+                                    do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsLong));
+                                }
+                            }
                         }
                     }
+
+                    @Override
+                    public boolean cancellationRequested() {
+                        cancellationRequestedCalled = true;
+                        return downstream.cancellationRequested();
+                    }
                 };
             }
         };
--- a/src/share/classes/java/util/stream/SortedOps.java	Thu Dec 13 00:46:39 2018 +0900
+++ b/src/share/classes/java/util/stream/SortedOps.java	Wed Jun 05 03:05:33 2019 +0100
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2012, 2017, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -304,7 +304,8 @@
     private static abstract class AbstractRefSortingSink<T> extends Sink.ChainedReference<T, T> {
         protected final Comparator<? super T> comparator;
         // @@@ could be a lazy final value, if/when support is added
-        protected boolean cancellationWasRequested;
+        // true if cancellationRequested() has been called
+        protected boolean cancellationRequestedCalled;
 
         AbstractRefSortingSink(Sink<? super T> downstream, Comparator<? super T> comparator) {
             super(downstream);
@@ -319,7 +320,11 @@
          */
         @Override
         public final boolean cancellationRequested() {
-            cancellationWasRequested = true;
+            // If this method is called then an operation within the stream
+            // pipeline is short-circuiting (see AbstractPipeline.copyInto).
+            // Note that we cannot differentiate between an upstream or
+            // downstream operation
+            cancellationRequestedCalled = true;
             return false;
         }
     }
@@ -347,7 +352,7 @@
         public void end() {
             Arrays.sort(array, 0, offset, comparator);
             downstream.begin(offset);
-            if (!cancellationWasRequested) {
+            if (!cancellationRequestedCalled) {
                 for (int i = 0; i < offset; i++)
                     downstream.accept(array[i]);
             }
@@ -386,7 +391,7 @@
         public void end() {
             list.sort(comparator);
             downstream.begin(list.size());
-            if (!cancellationWasRequested) {
+            if (!cancellationRequestedCalled) {
                 list.forEach(downstream::accept);
             }
             else {
@@ -409,7 +414,8 @@
      * Abstract {@link Sink} for implementing sort on int streams.
      */
     private static abstract class AbstractIntSortingSink extends Sink.ChainedInt<Integer> {
-        protected boolean cancellationWasRequested;
+        // true if cancellationRequested() has been called
+        protected boolean cancellationRequestedCalled;
 
         AbstractIntSortingSink(Sink<? super Integer> downstream) {
             super(downstream);
@@ -417,7 +423,7 @@
 
         @Override
         public final boolean cancellationRequested() {
-            cancellationWasRequested = true;
+            cancellationRequestedCalled = true;
             return false;
         }
     }
@@ -444,7 +450,7 @@
         public void end() {
             Arrays.sort(array, 0, offset);
             downstream.begin(offset);
-            if (!cancellationWasRequested) {
+            if (!cancellationRequestedCalled) {
                 for (int i = 0; i < offset; i++)
                     downstream.accept(array[i]);
             }
@@ -484,7 +490,7 @@
             int[] ints = b.asPrimitiveArray();
             Arrays.sort(ints);
             downstream.begin(ints.length);
-            if (!cancellationWasRequested) {
+            if (!cancellationRequestedCalled) {
                 for (int anInt : ints)
                     downstream.accept(anInt);
             }
@@ -507,7 +513,8 @@
      * Abstract {@link Sink} for implementing sort on long streams.
      */
     private static abstract class AbstractLongSortingSink extends Sink.ChainedLong<Long> {
-        protected boolean cancellationWasRequested;
+        // true if cancellationRequested() has been called
+        protected boolean cancellationRequestedCalled;
 
         AbstractLongSortingSink(Sink<? super Long> downstream) {
             super(downstream);
@@ -515,7 +522,7 @@
 
         @Override
         public final boolean cancellationRequested() {
-            cancellationWasRequested = true;
+            cancellationRequestedCalled = true;
             return false;
         }
     }
@@ -542,7 +549,7 @@
         public void end() {
             Arrays.sort(array, 0, offset);
             downstream.begin(offset);
-            if (!cancellationWasRequested) {
+            if (!cancellationRequestedCalled) {
                 for (int i = 0; i < offset; i++)
                     downstream.accept(array[i]);
             }
@@ -582,7 +589,7 @@
             long[] longs = b.asPrimitiveArray();
             Arrays.sort(longs);
             downstream.begin(longs.length);
-            if (!cancellationWasRequested) {
+            if (!cancellationRequestedCalled) {
                 for (long aLong : longs)
                     downstream.accept(aLong);
             }
@@ -605,7 +612,8 @@
      * Abstract {@link Sink} for implementing sort on long streams.
      */
     private static abstract class AbstractDoubleSortingSink extends Sink.ChainedDouble<Double> {
-        protected boolean cancellationWasRequested;
+        // true if cancellationRequested() has been called
+        protected boolean cancellationRequestedCalled;
 
         AbstractDoubleSortingSink(Sink<? super Double> downstream) {
             super(downstream);
@@ -613,7 +621,7 @@
 
         @Override
         public final boolean cancellationRequested() {
-            cancellationWasRequested = true;
+            cancellationRequestedCalled = true;
             return false;
         }
     }
@@ -640,7 +648,7 @@
         public void end() {
             Arrays.sort(array, 0, offset);
             downstream.begin(offset);
-            if (!cancellationWasRequested) {
+            if (!cancellationRequestedCalled) {
                 for (int i = 0; i < offset; i++)
                     downstream.accept(array[i]);
             }
@@ -680,7 +688,7 @@
             double[] doubles = b.asPrimitiveArray();
             Arrays.sort(doubles);
             downstream.begin(doubles.length);
-            if (!cancellationWasRequested) {
+            if (!cancellationRequestedCalled) {
                 for (double aDouble : doubles)
                     downstream.accept(aDouble);
             }
--- a/test/java/util/stream/test/org/openjdk/tests/java/util/stream/FlatMapOpTest.java	Thu Dec 13 00:46:39 2018 +0900
+++ b/test/java/util/stream/test/org/openjdk/tests/java/util/stream/FlatMapOpTest.java	Wed Jun 05 03:05:33 2019 +0100
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2012, 2016, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2012, 2017, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -24,7 +24,7 @@
 /*
  * @test
  * @summary flat-map operations
- * @bug 8044047 8076458
+ * @bug 8044047 8076458 8075939
  */
 
 package org.openjdk.tests.java.util.stream;
@@ -36,6 +36,7 @@
 import java.util.Collections;
 import java.util.function.Function;
 import java.util.stream.*;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static java.util.stream.LambdaTestHelpers.*;
 import static java.util.stream.ThowableHelper.checkNPE;
@@ -43,6 +44,7 @@
 @Test
 public class FlatMapOpTest extends OpTestCase {
 
+    @Test
     public void testNullMapper() {
         checkNPE(() -> Stream.of(1).flatMap(null));
         checkNPE(() -> IntStream.of(1).flatMap(null));
@@ -53,6 +55,7 @@
     static final Function<Integer, Stream<Integer>> integerRangeMapper
             = e -> IntStream.range(0, e).boxed();
 
+    @Test
     public void testFlatMap() {
         String[] stringsArray = {"hello", "there", "", "yada"};
         Stream<String> strings = Arrays.asList(stringsArray).stream();
@@ -85,11 +88,24 @@
         exerciseOps(data, s -> s.flatMap((Integer e) -> IntStream.range(0, e).boxed().limit(10)));
     }
 
+    @Test
+    public void testOpsShortCircuit() {
+        AtomicInteger count = new AtomicInteger();
+        Stream.of(0).flatMap(i -> IntStream.range(0, 100).boxed()).
+                peek(i -> count.incrementAndGet()).
+                limit(10).toArray();
+        assertEquals(count.get(), 10);
+    }
+
     //
 
     @Test(dataProvider = "IntStreamTestData", dataProviderClass = IntStreamTestDataProvider.class)
     public void testIntOps(String name, TestData.OfInt data) {
-        Collection<Integer> result = exerciseOps(data, s -> s.flatMap(i -> Collections.singleton(i).stream().mapToInt(j -> j)));
+        Collection<Integer> result = exerciseOps(data, s -> s.flatMap(IntStream::of));
+        assertEquals(data.size(), result.size());
+        assertContents(data, result);
+
+        result = exerciseOps(data, s -> s.boxed().flatMapToInt(IntStream::of));
         assertEquals(data.size(), result.size());
         assertContents(data, result);
 
@@ -101,13 +117,35 @@
     public void testIntOpsX(String name, TestData.OfInt data) {
         exerciseOps(data, s -> s.flatMap(e -> IntStream.range(0, e)));
         exerciseOps(data, s -> s.flatMap(e -> IntStream.range(0, e).limit(10)));
+
+        exerciseOps(data, s -> s.boxed().flatMapToInt(e -> IntStream.range(0, e)));
+        exerciseOps(data, s -> s.boxed().flatMapToInt(e -> IntStream.range(0, e).limit(10)));
+    }
+
+    @Test
+    public void testIntOpsShortCircuit() {
+        AtomicInteger count = new AtomicInteger();
+        IntStream.of(0).flatMap(i -> IntStream.range(0, 100)).
+                peek(i -> count.incrementAndGet()).
+                limit(10).toArray();
+        assertEquals(count.get(), 10);
+
+        count.set(0);
+        Stream.of(0).flatMapToInt(i -> IntStream.range(0, 100)).
+                peek(i -> count.incrementAndGet()).
+                limit(10).toArray();
+        assertEquals(count.get(), 10);
     }
 
     //
 
     @Test(dataProvider = "LongStreamTestData", dataProviderClass = LongStreamTestDataProvider.class)
     public void testLongOps(String name, TestData.OfLong data) {
-        Collection<Long> result = exerciseOps(data, s -> s.flatMap(i -> Collections.singleton(i).stream().mapToLong(j -> j)));
+        Collection<Long> result = exerciseOps(data, s -> s.flatMap(LongStream::of));
+        assertEquals(data.size(), result.size());
+        assertContents(data, result);
+
+        result = exerciseOps(data, s -> s.boxed().flatMapToLong(LongStream::of));
         assertEquals(data.size(), result.size());
         assertContents(data, result);
 
@@ -121,11 +159,30 @@
         exerciseOps(data, s -> s.flatMap(e -> LongStream.range(0, e).limit(10)));
     }
 
+    @Test
+    public void testLongOpsShortCircuit() {
+        AtomicInteger count = new AtomicInteger();
+        LongStream.of(0).flatMap(i -> LongStream.range(0, 100)).
+                peek(i -> count.incrementAndGet()).
+                limit(10).toArray();
+        assertEquals(count.get(), 10);
+
+        count.set(0);
+        Stream.of(0).flatMapToLong(i -> LongStream.range(0, 100)).
+                peek(i -> count.incrementAndGet()).
+                limit(10).toArray();
+        assertEquals(count.get(), 10);
+    }
+
     //
 
     @Test(dataProvider = "DoubleStreamTestData", dataProviderClass = DoubleStreamTestDataProvider.class)
     public void testDoubleOps(String name, TestData.OfDouble data) {
-        Collection<Double> result = exerciseOps(data, s -> s.flatMap(i -> Collections.singleton(i).stream().mapToDouble(j -> j)));
+        Collection<Double> result = exerciseOps(data, s -> s.flatMap(DoubleStream::of));
+        assertEquals(data.size(), result.size());
+        assertContents(data, result);
+
+        result = exerciseOps(data, s -> s.boxed().flatMapToDouble(DoubleStream::of));
         assertEquals(data.size(), result.size());
         assertContents(data, result);
 
@@ -138,4 +195,19 @@
         exerciseOps(data, s -> s.flatMap(e -> IntStream.range(0, (int) e).asDoubleStream()));
         exerciseOps(data, s -> s.flatMap(e -> IntStream.range(0, (int) e).limit(10).asDoubleStream()));
     }
+
+    @Test
+    public void testDoubleOpsShortCircuit() {
+        AtomicInteger count = new AtomicInteger();
+        DoubleStream.of(0).flatMap(i -> IntStream.range(0, 100).asDoubleStream()).
+                peek(i -> count.incrementAndGet()).
+                limit(10).toArray();
+        assertEquals(count.get(), 10);
+
+        count.set(0);
+        Stream.of(0).flatMapToDouble(i -> IntStream.range(0, 100).asDoubleStream()).
+                peek(i -> count.incrementAndGet()).
+                limit(10).toArray();
+        assertEquals(count.get(), 10);
+    }
 }