changeset 6020:250adf68e219

Adds parallel() to ArrayList, Vector. Implementation code moved from Arrays -> Streams. Some performance cleanup possibly needed for Array ranges. ie stream(array, offset, length)
author mduigou
date Fri, 14 Sep 2012 22:16:13 -0700
parents b1e04d190241
children 83987ec0e6f1
files src/share/classes/java/util/ArrayList.java src/share/classes/java/util/Arrays.java src/share/classes/java/util/Collection.java src/share/classes/java/util/Vector.java src/share/classes/java/util/streams/Streams.java
diffstat 5 files changed, 172 insertions(+), 196 deletions(-) [+]
line wrap: on
line diff
--- a/src/share/classes/java/util/ArrayList.java	Fri Sep 14 22:13:04 2012 -0700
+++ b/src/share/classes/java/util/ArrayList.java	Fri Sep 14 22:16:13 2012 -0700
@@ -25,6 +25,9 @@
 
 package java.util;
 
+import java.util.streams.Stream;
+import java.util.streams.Streams;
+
 /**
  * Resizable-array implementation of the <tt>List</tt> interface.  Implements
  * all optional list operations, and permits all elements, including
@@ -1128,4 +1131,9 @@
                 throw new ConcurrentModificationException();
         }
     }
+
+    @Override
+    public Stream<E> parallel() {
+        return Streams.parallel(this);
+    }
 }
--- a/src/share/classes/java/util/Arrays.java	Fri Sep 14 22:13:04 2012 -0700
+++ b/src/share/classes/java/util/Arrays.java	Fri Sep 14 22:16:13 2012 -0700
@@ -2903,6 +2903,11 @@
         public boolean contains(Object o) {
             return indexOf(o) != -1;
         }
+
+        @Override
+        public Stream<E> parallel() {
+            return Streams.parallel(this);
+        }
     }
 
     /**
@@ -3686,188 +3691,7 @@
         dejaVu.remove(a);
     }
 
-    private static class ArrayIterator<T> implements Iterator<T> {
-        protected final T[] array;
-        protected final int endOffset;
-        protected int offset;
-
-        private ArrayIterator(T[] array, int startOffset, int len) {
-            this.array = Objects.requireNonNull(array);
-            this.endOffset = startOffset + len;
-            this.offset = startOffset;
-
-            assert (offset >= 0) && ((offset < array.length) || (offset == endOffset)): "offset not in array";
-            assert (endOffset >= offset) && (endOffset <= array.length) : "end not in array";
-        }
-
-        @Override
-        public boolean hasNext() {
-            return offset < endOffset;
-        }
-
-        @Override
-        public T next() {
-            if(!hasNext()) {
-                throw new NoSuchElementException();
-            }
-            return array[offset++];
-        }
-    }
-
-    private static class ArraySpliterator<T> extends ArrayIterator<T> implements Spliterator<T> {
-        boolean iterating = false;
-
-        ArraySpliterator(T[] array) {
-            this(array, 0, array.length);
-        }
-
-        ArraySpliterator(T[] array, int offset, int length) {
-            super(array, offset, length);
-        }
-
-        @Override
-        public int getRemainingSizeIfKnown() {
-            return endOffset - offset;
-        }
-
-        @Override
-        public boolean isPredictableSplits() {
-            return true;
-        }
-
-        @Override
-        public void into(Sink<T, ?, ?> sink) {
-            sink.begin(getRemainingSizeIfKnown());
-            if (offset < endOffset) {
-                // Strange-looking way to iterate; reduce heap write traffic
-                int wasOffset = offset;
-                offset = endOffset;
-                for (int i=wasOffset; i<endOffset; i++)
-                    sink.accept(array[i]);
-            }
-            sink.end();
-        }
-
-        @Override
-        public int getNaturalSplitArity() {
-            return (endOffset - offset > 1) ? 1 : 0;
-        }
-
-        @Override
-        public Spliterator<T> split() {
-            if (iterating)
-                throw new IllegalStateException("split after iterate");
-            int t = (endOffset - offset) / 2;
-            Spliterator<T> ret = new ArraySpliterator<>(array, offset, t);
-            offset += t;
-            return ret;
-        }
-
-        @Override
-        public Iterator<T> iterator() {
-            iterating = true;
-            return this;
-        }
-    }
-
-    private static class ArrayStreamAccessor<T>
-            extends ArrayIterator<T> implements StreamAccessor<T> {
-        public ArrayStreamAccessor(T[] array, int startOffset, int len) {
-            super(array, startOffset, len);
-        }
-
-        @Override
-        public void into(Sink<T, ?, ?> sink) {
-            sink.begin(endOffset-offset);
-            if (offset < endOffset) {
-                // Strange-looking way to iterate; reduce heap write traffic
-                int wasOffset = offset;
-                offset = endOffset;
-                for (int i=wasOffset; i<endOffset; i++)
-                    sink.accept(array[i]);
-            }
-            sink.end();
-        }
-
-        @Override
-        public Iterator<T> iterator() {
-            return this;
-        }
-
-        @Override
-        public int getStreamFlags() {
-            return Stream.FLAG_SIZED;
-        }
-
-        @Override
-        public int getSizeOrEstimate() {
-            return endOffset - offset;
-        }
-
-        @Override
-        public boolean isParallel() {
-            return false;
-        }
-
-        @Override
-        public boolean isPredictableSplits() {
-            return false;
-        }
-
-        @Override
-        public StreamShape getShape() {
-            return StreamShape.SCALAR;
-        }
-
-        @Override
-        public Spliterator<T> spliterator() {
-            throw new UnsupportedOperationException();
-        }
-    }
-
-    private static class ArrayParallelStreamAccessor<T>
-            extends ArraySpliterator<T> implements StreamAccessor<T> {
-        private final int size;
-
-        ArrayParallelStreamAccessor(T[] array) {
-            this(array, 0, array.length);
-        }
-
-        ArrayParallelStreamAccessor(T[] array, int offset, int length) {
-            super(array, offset, length);
-            this.size = length;
-        }
-
-        @Override
-        public int getStreamFlags() {
-            return Stream.FLAG_SIZED;
-        }
-
-        @Override
-        public int getSizeOrEstimate() {
-            return size;
-        }
-
-        @Override
-        public boolean isParallel() {
-            return true;
-        }
-
-        @Override
-        public StreamShape getShape() {
-            return StreamShape.SCALAR;
-        }
-
-        @Override
-        public Iterator<T> iterator() {
-            return this;
-        }
-
-        @Override
-        public Spliterator<T> spliterator() {
-            return this;
-        }
-    }
+
 
     private static class ArraySink<T> implements Sink.OfLinear<T> {
         private final T[] array;
@@ -3904,43 +3728,43 @@
 
 
     public static<T> Iterator<T> iterator(T[] array) {
-        return iterator(array, 0, array.length);
+        return iterable(array).iterator();
     }
 
     public static<T> Iterator<T> iterator(T[] array, int offset, int length) {
-        return new ArrayIterator<>(array, offset, length);
+        return iterable(array,offset,length).iterator();
     }
 
     public static<T> Iterable<T> iterable(T[] array) {
-        return iterable(array, 0, array.length);
+        return asList(array);
     }
 
     public static<T> Iterable<T> iterable(T[] array, int offset, int length) {
-        return () -> new ArrayIterator<>(array, offset, length);
+        return asList(array).subList(offset, offset + length);
     }
 
     public static<T> Stream<T> stream(T[] array) {
-        return stream(array, 0, array.length);
+        return Streams.stream(asList(array));
     }
 
     public static<T> Stream<T> stream(T[] array, int offset, int length) {
-        return new LinearPipeline<>(new ArrayStreamAccessor<>(array, offset, length));
+        return Streams.stream(asList(array).subList(offset, offset + length));
     }
 
-    public static<T> Spliterator<T> spliterator(T[] array) {
-        return spliterator(array, 0, array.length);
+    public static <T, L extends RandomAccess & List<T>>  Spliterator<T> spliterator(T[] array) {
+        return Streams.spliterator((L) asList(array));
     }
 
-    public static<T> Spliterator<T> spliterator(T[] array, int offset, int length) {
-        return new ArraySpliterator<>(array, offset, length);
+    public static <T, L extends RandomAccess & List<T>> Spliterator<T> spliterator(T[] array, int offset, int length) {
+        return Streams.spliterator((L) asList(array).subList(offset, offset+length));
     }
 
-    public static<T> Stream<T> parallel(T[] array) {
-        return parallel(array, 0, array.length);
+    public static <T, L extends RandomAccess & List<T>>  Stream<T> parallel(T[] array) {
+        return Streams.parallel((L) asList(array));
     }
 
     public static<T> Stream<T> parallel(T[] array, int offset, int length) {
-        return new LinearPipeline<>(new ArrayParallelStreamAccessor<T>(array, offset, length));
+        return Streams.stream(asList(array).subList(offset, offset + length));
     }
 
     public static<T> Streamable<T> asStreamable(final T[] array, final int offset, final int length) {
--- a/src/share/classes/java/util/Collection.java	Fri Sep 14 22:13:04 2012 -0700
+++ b/src/share/classes/java/util/Collection.java	Fri Sep 14 22:16:13 2012 -0700
@@ -491,7 +491,7 @@
 
     @Override
     Stream<E> stream() default {
-        return Streams.stream(this, size());
+        return Streams.stream(this);
     }
 
     @Override
--- a/src/share/classes/java/util/Vector.java	Fri Sep 14 22:13:04 2012 -0700
+++ b/src/share/classes/java/util/Vector.java	Fri Sep 14 22:16:13 2012 -0700
@@ -25,6 +25,9 @@
 
 package java.util;
 
+import java.util.streams.Stream;
+import java.util.streams.Streams;
+
 /**
  * The {@code Vector} class implements a growable array of
  * objects. Like an array, it contains components that can be
@@ -1209,4 +1212,9 @@
             lastRet = -1;
         }
     }
+
+    @Override
+    public Stream<E> parallel() {
+        return Streams.parallel(this);
+    }
 }
--- a/src/share/classes/java/util/streams/Streams.java	Fri Sep 14 22:13:04 2012 -0700
+++ b/src/share/classes/java/util/streams/Streams.java	Fri Sep 14 22:16:13 2012 -0700
@@ -91,6 +91,14 @@
         return new LinearPipeline<>(new IteratorStreamAccessor<>(source));
     }
 
+    public static <T, L extends RandomAccess & List<T>> Spliterator<T> spliterator(L source) {
+        return new RandomAccessListSpliterator(source);
+    }
+
+    public static <T, L extends RandomAccess & List<T>> Stream<T> parallel(L source) {
+        return new LinearPipeline<>(new RandomAccessListParallelStreamAccessor<>(source));
+    }
+
     public static<T> Stream<T> parallel(Spliterator<T> source, int sizeOrEstimate) {
         return new LinearPipeline<>(new SpliteratorStreamAccessor<>(source, sizeOrEstimate));
     }
@@ -570,4 +578,132 @@
             return iterator().curValue();
         }
     }
+
+    private static class RandomAccessListIterator<T, L extends RandomAccess & List<T>> implements Iterator<T> {
+        protected final L elements;
+        protected final int endOffset;
+        protected int offset;
+
+        private RandomAccessListIterator(L elements, int startOffset, int len) {
+            this.elements = Objects.requireNonNull(elements);
+            this.endOffset = startOffset + len;
+            this.offset = startOffset;
+
+            assert (offset >= 0) && ((offset < elements.size()) || (offset == endOffset)): "offset not in array";
+            assert (endOffset >= offset) && (endOffset <= elements.size()) : "end not in array";
+        }
+
+        @Override
+        public boolean hasNext() {
+            return offset < endOffset;
+        }
+
+        @Override
+        public T next() {
+            if(!hasNext()) {
+                throw new NoSuchElementException();
+            }
+            return elements.get(offset++);
+        }
+    }
+
+    private static class RandomAccessListSpliterator<T, L extends RandomAccess & List<T>> extends RandomAccessListIterator<T,L> implements Spliterator<T> {
+        boolean iterating = false;
+
+        RandomAccessListSpliterator(L elements) {
+            this(elements, 0, elements.size());
+        }
+
+        RandomAccessListSpliterator(L elements, int offset, int length) {
+            super(elements, offset, length);
+        }
+
+        @Override
+        public int getRemainingSizeIfKnown() {
+            return endOffset - offset;
+        }
+
+        @Override
+        public boolean isPredictableSplits() {
+            return true;
+        }
+
+        @Override
+        public void into(Sink<T, ?, ?> sink) {
+            sink.begin(getRemainingSizeIfKnown());
+            if (offset < endOffset) {
+                // Strange-looking way to iterate; reduce heap write traffic
+                int wasOffset = offset;
+                offset = endOffset;
+                for (int i=wasOffset; i<endOffset; i++)
+                    sink.accept(elements.get(i));
+            }
+            sink.end();
+        }
+
+        @Override
+        public int getNaturalSplitArity() {
+            return (endOffset - offset > 1) ? 1 : 0;
+        }
+
+        @Override
+        public Spliterator<T> split() {
+            if (iterating)
+                throw new IllegalStateException("split after iterate");
+            int t = (endOffset - offset) / 2;
+            Spliterator<T> ret = new RandomAccessListSpliterator<>(elements, offset, t);
+            offset += t;
+            return ret;
+        }
+
+        @Override
+        public Iterator<T> iterator() {
+            iterating = true;
+            return this;
+        }
+    }
+
+    private static class RandomAccessListParallelStreamAccessor<T,L extends RandomAccess & List<T>>
+            extends RandomAccessListSpliterator<T,L> implements StreamAccessor<T> {
+        private final int size;
+
+        RandomAccessListParallelStreamAccessor(L elements) {
+            this(elements, 0, elements.size());
+        }
+
+        RandomAccessListParallelStreamAccessor(L elements, int offset, int length) {
+            super(elements, offset, length);
+            this.size = length;
+        }
+
+        @Override
+        public int getStreamFlags() {
+            return Stream.FLAG_SIZED;
+        }
+
+        @Override
+        public int getSizeOrEstimate() {
+            return size;
+        }
+
+        @Override
+        public boolean isParallel() {
+            return true;
+        }
+
+        @Override
+        public StreamShape getShape() {
+            return StreamShape.SCALAR;
+        }
+
+        @Override
+        public Iterator<T> iterator() {
+            return this;
+        }
+
+        @Override
+        public Spliterator<T> spliterator() {
+            return this;
+        }
+    }
 }