changeset 4718:9a0b17ea5292

Separate Spliterable.iterator and Spliterable.sequentia; fix return type in Iterable.forEach; add Iterable.flatMap and ParallelIterable.flatMap
author briangoetz
date Wed, 28 Dec 2011 23:27:04 -0500
parents 085fb9c9164f
children 7e592a11b492
files src/share/classes/java/lang/Iterable.java src/share/classes/java/util/Arrays.java src/share/classes/java/util/Comparators.java src/share/classes/java/util/Iterables.java src/share/classes/java/util/Iterators.java src/share/classes/java/util/ParallelIterable.java src/share/classes/java/util/ParallelIterables.java src/share/classes/java/util/Splittable.java test-ng/tests/java/util/IterableTest.java test-ng/tests/java/util/IterablesTest.java test-ng/tests/java/util/IteratorsTest.java test-ng/tests/java/util/LambdaTestHelpers.java test-ng/tests/java/util/ParallelIterableTest.java
diffstat 13 files changed, 354 insertions(+), 87 deletions(-) [+]
line wrap: on
line diff
--- a/src/share/classes/java/lang/Iterable.java	Sun Dec 18 15:47:06 2011 -0800
+++ b/src/share/classes/java/lang/Iterable.java	Wed Dec 28 23:27:04 2011 -0500
@@ -83,8 +83,8 @@
      * @param block The operation to be performed upon each each element.
      * @return This Iterable.
      */
-    Iterable<T> forEach(Block<? super T> block) default {
-        return Iterables.forEach(this, block);
+    void forEach(Block<? super T> block) default {
+        Iterables.forEach(this, block);
     }
 
     /**
@@ -107,6 +107,10 @@
          return Iterables.filterMap(this, predicate, mapper);
      }
 
+    <U> Iterable<U> flatMap(Mapper<? super T, ? extends Iterable<U>> mapper) default {
+        return Iterables.flatMap(this, mapper);
+    }
+
     /**
      * Reduce elements to a single value.
      *
--- a/src/share/classes/java/util/Arrays.java	Sun Dec 18 15:47:06 2011 -0800
+++ b/src/share/classes/java/util/Arrays.java	Wed Dec 28 23:27:04 2011 -0500
@@ -3745,23 +3745,18 @@
         }
 
         @Override
+        public Iterator<T> iterator() {
+            return new ArrayIterator<>(array, offset, length);
+        }
+
+        @Override
         public Iterable<T> sequential() {
-            return new Iterable<T>() {
-                @Override
-                public Iterator<T> iterator() {
-                    return new ArrayIterator<>(array, offset, length);
-                }
-            };
+            return () -> iterator();
         }
     }
 
     public static<T> Iterable<T> iterable(final T[] array) {
-        return new Iterable<T>() {
-            @Override
-            public Iterator<T> iterator() {
-                return new ArrayIterator<>(array, 0, array.length);
-            }
-        };
+        return () -> new ArrayIterator<>(array, 0, array.length);
     }
 
     public static<T> ParallelIterable<T> parallel(final T[] array) {
--- a/src/share/classes/java/util/Comparators.java	Sun Dec 18 15:47:06 2011 -0800
+++ b/src/share/classes/java/util/Comparators.java	Wed Dec 28 23:27:04 2011 -0500
@@ -126,10 +126,10 @@
         private final Comparator<? super T> second;
 
         private ComposedComparator(Comparator<T> first, Comparator<? super T> second) {
+            Objects.requireNonNull(first);
+            Objects.requireNonNull(second);
             this.first = first;
             this.second = second;
-            Objects.requireNonNull(first);
-            Objects.requireNonNull(second);
         }
 
         public int compare(T c1, T c2) {
--- a/src/share/classes/java/util/Iterables.java	Sun Dec 18 15:47:06 2011 -0800
+++ b/src/share/classes/java/util/Iterables.java	Wed Dec 28 23:27:04 2011 -0500
@@ -203,6 +203,18 @@
             // @@@ Override count()?
         };
     }
+    
+    public static<T,U> Iterable<U> flatMap(final Iterable<? extends T> iterable, final Mapper<? super T, ? extends Iterable<U>> mapper) {
+        Objects.requireNonNull(iterable);
+        Objects.requireNonNull(mapper);
+
+        return new BaseIterable<U>() {
+            @Override
+            public Iterator<U> iterator() {
+                return Iterators.flatMap(iterable.iterator(), mapper);
+            }
+        };
+    }
 
     /**
      * Reduce elements to a single value.
--- a/src/share/classes/java/util/Iterators.java	Sun Dec 18 15:47:06 2011 -0800
+++ b/src/share/classes/java/util/Iterators.java	Wed Dec 28 23:27:04 2011 -0500
@@ -110,6 +110,41 @@
         };
     }
 
+    public static <T, U> Iterator<U> flatMap(final Iterator<? extends T> iterator, final Mapper<? super T, ? extends Iterable<U>> mapper) {
+        Objects.requireNonNull(iterator);
+        Objects.requireNonNull(mapper);
+        return new Iterator<U>() {
+            final Iterator<? extends T> source = iterator;
+            Iterator<? extends U> currentMapped = null;
+
+            @Override
+            public boolean hasNext() {
+                while (true) {
+                    if (currentMapped == null) {
+                        if (!source.hasNext())
+                            return false;
+                        else
+                            currentMapped = mapper.map(source.next()).iterator();
+                    }
+                    else {
+                        if (currentMapped.hasNext())
+                            return true;
+                        else
+                            currentMapped = null;
+                    }
+                }
+            }
+
+            @Override
+            public U next() {
+                // TODO hasNext called twice for each element, cache result instead
+                if (!hasNext())
+                    throw new NoSuchElementException();
+                return currentMapped.next();
+            }
+        };
+    }
+
     public static <T> T reduce(final Iterator<? extends T> iterator, T base, final Operator<T> operator) {
         Objects.requireNonNull(iterator);
         Objects.requireNonNull(operator);
@@ -196,17 +231,7 @@
         final PriorityQueue<T> pq = new PriorityQueue<>();
         while (iterator.hasNext())
             pq.add(iterator.next());
-        return new Iterator<T>() {
-            @Override
-            public boolean hasNext() {
-                return !pq.isEmpty();
-            }
-
-            @Override
-            public T next() {
-                return pq.remove();
-            }
-        };
+        return new PQRemovalIterator<>(pq);
     }
 
     public static <T> Iterator<T> sorted(Iterator<? extends T> iterator, Comparator<? super T> comp) {
@@ -215,17 +240,25 @@
         final PriorityQueue<T> pq = new PriorityQueue<>(DEFAULT_PRIORITY_QUEUE_SIZE, comp);
         while (iterator.hasNext())
             pq.add(iterator.next());
-        return new Iterator<T>() {
-            @Override
-            public boolean hasNext() {
-                return !pq.isEmpty();
-            }
+        return new PQRemovalIterator<>(pq);
+    }
 
-            @Override
-            public T next() {
-                return pq.remove();
-            }
-        };
+    private static class PQRemovalIterator<T> implements Iterator<T> {
+        private final PriorityQueue<T> pq;
+
+        public PQRemovalIterator(PriorityQueue<T> pq) {
+            this.pq = pq;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return !pq.isEmpty();
+        }
+
+        @Override
+        public T next() {
+            return pq.remove();
+        }
     }
 
     // @@@ cumulation
--- a/src/share/classes/java/util/ParallelIterable.java	Sun Dec 18 15:47:06 2011 -0800
+++ b/src/share/classes/java/util/ParallelIterable.java	Wed Dec 28 23:27:04 2011 -0500
@@ -59,9 +59,13 @@
      * and elements of type {@code U}.
      * @return An Iterable view consisting of the mapped elements.
      */
-     <U> ParallelIterable<U> map(Mapper<? super T, ? extends U> mapper) default {
-         return ParallelIterables.map(this, mapper);
-     }
+    <U> ParallelIterable<U> map(Mapper<? super T, ? extends U> mapper) default {
+        return ParallelIterables.map(this, mapper);
+    }
+
+    <U> ParallelIterable<U> flatMap(Mapper<? super T, ? extends Iterable<U>> mapper) default {
+        return ParallelIterables.flatMap(this, mapper);
+    }
 
     /**
      * Reduce elements to a single value.
--- a/src/share/classes/java/util/ParallelIterables.java	Sun Dec 18 15:47:06 2011 -0800
+++ b/src/share/classes/java/util/ParallelIterables.java	Wed Dec 28 23:27:04 2011 -0500
@@ -2,6 +2,7 @@
 
 import java.util.concurrent.ForkJoinUtils;
 import java.util.concurrent.RecursiveAction;
+import java.util.concurrent.RecursiveTask;
 import java.util.functions.*;
 
 /**
@@ -21,7 +22,7 @@
      */
     public static <T> boolean isEmpty(ParallelIterable<T> iterable) {
         Objects.requireNonNull(iterable);
-        return !iterable.sequential().iterator().hasNext();
+        return anyMatch(iterable, Predicates.alwaysTrue());
     }
 
     private static<T> int calculateDepth(long s) {
@@ -66,6 +67,56 @@
         }
     }
 
+    private static abstract class SequentialTask<T, U, X> extends RecursiveTask<Iterable<U>> {
+        public final int depth;
+        public final ParallelIterable<T> coll;
+        public final SequentialTask<T, U, X> left, right;
+        public final X childContext;
+
+        public SequentialTask(int depth, ParallelIterable<T> coll, X childContext) {
+            this.depth = depth;
+            this.coll = coll;
+            this.childContext = childContext;
+            if (depth > 0) {
+                left = makeNode(depth - 1, coll.left());
+                right = makeNode(depth - 1, coll.right());
+            }
+            else {
+                left = right = null;
+            }
+        }
+
+        public Iterator<U> iterator() {
+            final List<SequentialTask<T, U, X>> nodes = new ArrayList<>(1 << depth);
+            populate(nodes, this);
+            return nodes.flatMap(task -> task.join()).iterator();
+        }
+
+        private static<T, U, X> void populate(List<SequentialTask<T, U, X>> nodes, SequentialTask<T, U, X> task) {
+            if (task.left != null) {
+                populate(nodes, task.left);
+                populate(nodes, task.right);
+            }
+            else
+                nodes.add(task);
+        }
+
+        @Override
+        protected Iterable<U> compute() {
+            if (depth == 0)
+                return leafAction(coll);
+            else {
+                right.fork();
+                left.fork();
+                return null;
+            }
+        }
+        
+        protected abstract SequentialTask<T,U, X> makeNode(int depth, ParallelIterable<T> coll);
+        
+        protected abstract Iterable<U> leafAction(ParallelIterable<T> coll);
+    }
+
     private static class CountTask<T> extends BaseTask<T, CountTask<T>> {
         public long count;
 
@@ -75,7 +126,7 @@
 
         @Override
         public void seq() {
-            count = Iterables.count(coll.sequential());
+            count = Iterators.count(coll.iterator());
         }
 
         @Override
@@ -100,13 +151,11 @@
         }
     }
 
-    private static class Filtered<T> implements ParallelIterable<T> {
-        private final ParallelIterable<T> underlying;
-        private final Predicate<T> predicate;
+    private static abstract class BaseLazy<T, U> implements ParallelIterable<U> {
+        protected final ParallelIterable<T> underlying;
 
-        private Filtered(ParallelIterable<T> underlying, Predicate<T> predicate) {
+        private BaseLazy(ParallelIterable<T> underlying) {
             this.underlying = underlying;
-            this.predicate = predicate;
         }
 
         @Override
@@ -115,19 +164,69 @@
         }
 
         @Override
-        public ParallelIterable<T> left() {
-            return new Filtered<>(underlying.left(), predicate);
+        public ParallelIterable<U> left() {
+            return makeNode(underlying.left());
         }
 
         @Override
-        public ParallelIterable<T> right() {
-            return new Filtered<>(underlying.right(), predicate);
+        public ParallelIterable<U> right() {
+            return makeNode(underlying.right());
         }
 
         @Override
-        public Iterable<T> sequential() {
-            // @@@ Wrong!  This is a sequential traversal
-            return underlying.sequential().filter(predicate);
+        public Iterator<U> iterator() {
+            return leafAction(underlying.iterator());
+        }
+
+        @Override
+        public Iterable<U> sequential() {
+            return () -> {
+                    SequentialTask<T, U, BaseLazy<T, U>> t = new SequentialTaskForBaseLazy<T,U>(calculateDepth(underlying.estimateCount()), underlying, BaseLazy.this);
+                    ForkJoinUtils.defaultFJPool().invoke(t);
+                    return t.iterator();
+            };
+        }
+        
+        protected abstract BaseLazy<T, U> makeNode(ParallelIterable<T> pi);
+
+        protected abstract Iterator<U> leafAction(Iterator<T> it);
+
+    }
+
+    // @@@ Should be an inner class of BaseLazy, but compiler won't let us yet
+    private static class SequentialTaskForBaseLazy<T,U> extends SequentialTask<T, U, BaseLazy<T, U>> {
+        
+        private SequentialTaskForBaseLazy(int depth, ParallelIterable<T> coll, BaseLazy<T,U> lazy) {
+            super(depth, coll, lazy);
+        }
+
+        @Override
+        protected SequentialTask<T, U, BaseLazy<T, U>> makeNode(int depth, ParallelIterable<T> coll) {
+            return new SequentialTaskForBaseLazy<>(depth, coll, childContext);
+        }
+
+        @Override
+        protected Iterable<U> leafAction(ParallelIterable<T> coll) {
+            return () -> childContext.leafAction(coll.iterator());
+        }
+    }
+
+    private static class Filtered<T> extends BaseLazy<T, T> {
+        private final Predicate<? super T> predicate;
+
+        private Filtered(ParallelIterable<T> underlying, Predicate<? super T> predicate) {
+            super(underlying);
+            this.predicate = predicate;
+        }
+
+        @Override
+        protected BaseLazy<T, T> makeNode(ParallelIterable<T> pi) {
+            return new Filtered<>(pi, predicate);
+        }
+
+        @Override
+        protected Iterator<T> leafAction(Iterator<T> it) {
+            return Iterators.filter(it, predicate);
         }
     }
 
@@ -146,7 +245,7 @@
     public static <T> ParallelIterable<T> filter(final ParallelIterable<T> pi, final Predicate<? super T> predicate) {
         Objects.requireNonNull(pi);
         Objects.requireNonNull(predicate);
-        return new Filtered(pi, predicate);
+        return new Filtered<>(pi, predicate);
     }
 
     private static class ForEachTask<T> extends BaseTask<T, ForEachTask<T>> {
@@ -160,7 +259,7 @@
 
         @Override
         public void seq() {
-            coll.sequential().forEach(block);
+            Iterators.forEach(coll.iterator(), block);
         }
 
         @Override
@@ -185,34 +284,22 @@
         ForkJoinUtils.defaultFJPool().invoke(new ForEachTask<>(calculateDepth(pi.estimateCount()), pi, block));
     }
 
-    private static class Mapped<T, U> implements ParallelIterable<U> {
-        private final ParallelIterable<T> underlying;
+    private static class Mapped<T, U> extends BaseLazy<T, U> {
         private final Mapper<? super T, ? extends U> mapper;
 
         private Mapped(ParallelIterable<T> underlying, Mapper<? super T, ? extends U> mapper) {
-            this.underlying = underlying;
+            super(underlying);
             this.mapper = mapper;
         }
 
         @Override
-        public long estimateCount() {
-            return underlying.estimateCount();
+        protected BaseLazy<T, U> makeNode(ParallelIterable<T> pi) {
+            return new Mapped<>(pi, mapper);
         }
 
         @Override
-        public ParallelIterable<U> left() {
-            return new Mapped<>(underlying.left(), mapper);
-        }
-
-        @Override
-        public ParallelIterable<U> right() {
-            return new Mapped<>(underlying.right(), mapper);
-        }
-
-        @Override
-        public Iterable<U> sequential() {
-            // @@@ Wrong!  This is a sequential traversal
-            return underlying.sequential().map(mapper);
+        protected Iterator<U> leafAction(Iterator<T> it) {
+            return Iterators.map(it, mapper);
         }
     }
 
@@ -233,6 +320,31 @@
         return new Mapped<>(pi, mapper);
     }
 
+    private static class FlatMapped<T, U> extends BaseLazy<T,U> {
+        private final Mapper<? super T, ? extends Iterable<U>> mapper;
+
+        private FlatMapped(ParallelIterable<T> underlying, Mapper<? super T, ? extends Iterable<U>> mapper) {
+            super(underlying);
+            this.mapper = mapper;
+        }
+
+        @Override
+        protected BaseLazy<T, U> makeNode(ParallelIterable<T> pi) {
+            return new FlatMapped<>(pi, mapper);
+        }
+
+        @Override
+        protected Iterator<U> leafAction(Iterator<T> it) {
+            return Iterators.flatMap(it, mapper);
+        }
+    }
+
+    public static <T, U> ParallelIterable<U> flatMap(final ParallelIterable<T> pi, final Mapper<? super T, ? extends Iterable<U>> mapper) {
+        Objects.requireNonNull(pi);
+        Objects.requireNonNull(mapper);
+        return new FlatMapped<>(pi, mapper);
+    }
+
     private static class ReduceTask<T> extends BaseTask<T, ReduceTask<T>> {
         private static final long serialVersionUID = 1L;
         public final Operator<T> operator;
@@ -247,7 +359,7 @@
 
         @Override
         public void seq() {
-            value = coll.sequential().reduce(base, operator);
+            value = Iterators.reduce(coll.iterator(), base, operator);
         }
 
         @Override
@@ -277,7 +389,7 @@
 
         @Override
         public void seq() {
-            value = coll.sequential().mapReduce(mapper, base, operator);
+            value = Iterators.mapReduce(coll.iterator(), mapper, base, operator);
         }
 
         @Override
@@ -307,7 +419,7 @@
 
         @Override
         public void seq() {
-            value = coll.sequential().mapReduce(mapper, base, operator);
+            value = Iterators.mapReduce(coll.iterator(), mapper, base, operator);
         }
 
         @Override
@@ -337,7 +449,7 @@
 
         @Override
         public void seq() {
-            value = coll.sequential().mapReduce(mapper, base, operator);
+            value = Iterators.mapReduce(coll.iterator(), mapper, base, operator);
         }
 
         @Override
@@ -367,7 +479,7 @@
 
         @Override
         public void seq() {
-            value = coll.sequential().mapReduce(mapper, base, operator);
+            value = Iterators.mapReduce(coll.iterator(), mapper, base, operator);
         }
 
         @Override
@@ -468,9 +580,9 @@
         @Override
         public void seq() {
             switch (kind) {
-                case ANY: value = coll.sequential().anyMatch(predicate); break;
-                case ALL: value = coll.sequential().allMatch(predicate); break;
-                case NONE: value = coll.sequential().noneMatch(predicate); break;
+                case ANY: value = Iterators.anyMatch(coll.iterator(), predicate); break;
+                case ALL: value = Iterators.allMatch(coll.iterator(), predicate); break;
+                case NONE: value = Iterators.noneMatch(coll.iterator(), predicate); break;
             }
         }
 
--- a/src/share/classes/java/util/Splittable.java	Sun Dec 18 15:47:06 2011 -0800
+++ b/src/share/classes/java/util/Splittable.java	Wed Dec 28 23:27:04 2011 -0500
@@ -2,13 +2,38 @@
 
 /**
  * A decomposable container for elements. The elements can be accessed by
- * requesting the sequential {@code Iterable}.
+ * requesting a sequential {@link Iterator}.
  *
  * @param <T> the type of elements returned by the sequential iterator
  * @param <S> Result type of split
  */
 public interface Splittable<T, S extends Splittable<T, S>> {
+    /** Return an {@link Iterator}  for the elements of this split.   In general, this method is only called
+     * at the leaves of a decomposition tree, though it can be called at any level.  */
+    Iterator<T> iterator();
+
+    /** Decompose this split into two splits, and return the left split.  If further splitting is impossible,
+     * {@code left} may return a {@code Splittable} representing the entire split, or an empty split.
+     */
     S left();
+
+    /** Decompose this split into two splits, and return the right split.  If further splitting is impossible,
+     * {@code right} may return a {@code Splittable} representing the entire split, or an empty split.
+     */
     S right();
+
+    /**
+     * Produce an {@link Iterable} representing the contents of this {@code Splittable}.  In general, this method is
+     * only called at the top of a decomposition tree, indicating that operations that produced the {@code Spliterable}
+     * can happen in parallel, but the results are assembled for sequential traversal.  This is designed to support
+     * patterns like:
+     *     collection.filter(t -> t.matches(k))
+     *               .map(t -> t.getLabel())
+     *               .sorted()
+     *               .sequential()
+     *               .forEach(e -> println(e));
+     * where the filter / map / sort operations can occur in parallel, and then the results can be traversed
+     * sequentially in a predicatable order.
+     */
     Iterable<T> sequential();
 }
--- a/test-ng/tests/java/util/IterableTest.java	Sun Dec 18 15:47:06 2011 -0800
+++ b/test-ng/tests/java/util/IterableTest.java	Wed Dec 28 23:27:04 2011 -0500
@@ -107,6 +107,13 @@
         assertCountSum(countTo(0).map(mDoubler), 0, 0);
     }
 
+    public void testFlatMap() {
+        Iterable<String> strings = Arrays.asList("hello", "there", "", "yada");
+        Iterable<String> empty = Arrays.asList();
+        assertConcat(strings.flatMap(flattenChars).iterator(), "hellothereyada");
+        assertConcat(empty.flatMap(flattenChars).iterator(), "");
+    }
+
     public void testReduce() {
         List<Integer> list = countTo(10);
 
--- a/test-ng/tests/java/util/IterablesTest.java	Sun Dec 18 15:47:06 2011 -0800
+++ b/test-ng/tests/java/util/IterablesTest.java	Wed Dec 28 23:27:04 2011 -0500
@@ -27,6 +27,7 @@
 import org.testng.annotations.Test;
 
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.functions.Mapper;
 
 import static java.util.LambdaTestHelpers.*;
 import static org.testng.Assert.*;
@@ -116,6 +117,11 @@
         assertCountSum(Iterables.map(countTo(0), mDoubler), 0, 0);
     }
 
+    public void testFlatMap() {
+        Iterable<String> strings = Arrays.asList("hello", "there", "", "yada");
+        assertConcat(Iterables.flatMap(strings, flattenChars).iterator(), "hellothereyada");
+    }
+
     public void testReduce() {
         List<Integer> list = countTo(10);
 
--- a/test-ng/tests/java/util/IteratorsTest.java	Sun Dec 18 15:47:06 2011 -0800
+++ b/test-ng/tests/java/util/IteratorsTest.java	Wed Dec 28 23:27:04 2011 -0500
@@ -27,6 +27,7 @@
 import org.testng.annotations.Test;
 
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.functions.Mapper;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
@@ -109,6 +110,11 @@
         assertCountSum(Iterators.map(countTo(0).iterator(), mDoubler), 0, 0);
     }
 
+    public void testFlatMap() {
+        Iterable<String> strings = Arrays.asList("hello", "there", "", "yada");
+        assertConcat(Iterators.flatMap(strings.iterator(), flattenChars), "hellothereyada");
+    }
+
     public void testReduce() {
         List<Integer> list = countTo(10);
 
--- a/test-ng/tests/java/util/LambdaTestHelpers.java	Sun Dec 18 15:47:06 2011 -0800
+++ b/test-ng/tests/java/util/LambdaTestHelpers.java	Wed Dec 28 23:27:04 2011 -0500
@@ -51,6 +51,31 @@
     public static final Block<Integer> bEmpty = x -> { };
     public static final Comparator<Integer> cInteger = (a, b) -> Integer.compare(a, b);
 
+    public static final Mapper<String, Iterable<Character>> flattenChars = new Mapper<String, Iterable<Character>>() {
+        @Override
+        public Iterable<Character> map(final String s) {
+            return new Iterable<Character>() {
+                @Override
+                public Iterator<Character> iterator() {
+                    return new Iterator<Character>() {
+                        private int offset = 0;
+                        private final int len = s.length();
+
+                        @Override
+                        public boolean hasNext() {
+                            return offset < len;
+                        }
+
+                        @Override
+                        public Character next() {
+                            return s.charAt(offset++);
+                        }
+                    };
+                }
+            };
+        }
+    };
+
     public static List<Integer> countTo(int n) {
         ArrayList<Integer> list = new ArrayList<>();
         for (int i=1; i<=n; i++) {
@@ -90,6 +115,15 @@
         assertEquals(s, sum);
     }
 
+    public static void assertConcat(Iterator<Character> it, String result) {
+        StringBuilder sb = new StringBuilder();
+        while (it.hasNext()) {
+            sb.append(it.next());
+        }
+
+        assertEquals(result, sb.toString());
+    }
+
     public static<T extends Comparable<? super T>> void assertSorted(Iterator<T> i) {
         T last = i.next();
         while (i.hasNext()) {
@@ -130,7 +164,7 @@
         }
         assertTrue(!pI.hasNext());
 
-        for (int depth=0; depth<10; depth++) {
+        for (int depth=0; depth<6; depth++) {
             Iterable<Iterable<T>> splits = split(pi, depth);
             assertSplitContents(splits, list);
         }
@@ -171,7 +205,11 @@
 
     private static <T> Iterable<Iterable<T>> splitHelper(ParallelIterable<T> s, int depth, List<Iterable<T>> iterables) {
         if (depth == 0) {
-            iterables.add(s.sequential().into(new ArrayList<T>()));
+            Iterator<T> it = s.iterator();
+            List<T> list = new ArrayList<>();
+            while (it.hasNext())
+                list.add(it.next());
+            iterables.add(list);
         }
         else {
             splitHelper(s.left(), depth-1, iterables);
--- a/test-ng/tests/java/util/ParallelIterableTest.java	Sun Dec 18 15:47:06 2011 -0800
+++ b/test-ng/tests/java/util/ParallelIterableTest.java	Wed Dec 28 23:27:04 2011 -0500
@@ -5,6 +5,7 @@
 
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.functions.Mapper;
 import java.util.functions.Predicate;
 
 import static java.util.LambdaTestHelpers.*;
@@ -22,7 +23,6 @@
         assertContents(pi, Arrays.iterable(array));
     }
 
-
     @Test(dataProvider = "lists")
     public void testEmpty(List<Integer> list, ParallelIterable<Integer> pi) {
 
@@ -32,8 +32,11 @@
 
     @Test(dataProvider = "lists")
     public void testFilter(List<Integer> list, ParallelIterable<Integer> pi) {
-        for (Predicate<Integer> p : Arrays.asList(pTrue, pFalse, pEven, pOdd))
+        for (Predicate<Integer> p : Arrays.asList(pTrue, pFalse, pEven, pOdd)) {
+            assertContents(pi.filter(p), list.filter(p));
             assertContents(pi.filter(p), pi.sequential().filter(p));
+            assertEquals(pi.filter(p).into(new ArrayList<>()), list.filter(p).into(new ArrayList<>()));
+        }
 
         assertContents(pi.filter(pEven).filter(pOdd), Collections.<Integer>emptyList());
         assertContents(pi.filter(pEven).filter(pEven), list.filter(pEven));
@@ -44,10 +47,32 @@
     @Test(dataProvider = "lists")
     public void testMap(List<Integer> list, ParallelIterable<Integer> pi) {
         assertContents(pi.map(mDoubler), list.map(mDoubler));
+        assertContents(pi.map(mDoubler), pi.sequential().map(mDoubler));
         assertEquals(pi.map(mDoubler).into(new ArrayList<>()), list.map(mDoubler).into(new ArrayList<>()));
     }
 
     @Test(dataProvider = "lists")
+    public void testFlatMap(List<Integer> list, ParallelIterable<Integer> pi) {
+        Mapper<Integer, Iterable<Integer>> mIntToBits = new Mapper<Integer, Iterable<Integer>>() {
+            @Override
+            public Iterable<Integer> map(Integer integer) {
+                int num = integer;
+                List<Integer> list = new ArrayList<>();
+                for (int i=0; i<16 && num != 0; i++) {
+                    if ((num & (1 << i)) != 0) {
+                        num &= ~(1 << i);
+                        list.add(i);
+                    }
+                }
+                return list;
+            }
+        };
+        assertContents(pi.flatMap(mIntToBits), list.flatMap(mIntToBits));
+        assertContents(pi.flatMap(mIntToBits), pi.sequential().flatMap(mIntToBits));
+        assertEquals(pi.flatMap(mIntToBits).into(new ArrayList<>()), list.flatMap(mIntToBits).into(new ArrayList<>()));
+    }
+
+    @Test(dataProvider = "lists")
     public void testReduce(List<Integer> list, ParallelIterable<Integer> pi) {
         assertEquals(pi.reduce(0, rPlus), list.reduce(0, rPlus));
         assertEquals(pi.reduce(Integer.MIN_VALUE, rMax), list.reduce(Integer.MIN_VALUE, rMax));