OpenJDK / lambda / lambda / jdk
changeset 6020:250adf68e219
Adds parallel() to ArrayList, Vector. Implementation code moved from Arrays -> Streams. Some performance cleanup possibly needed for Array ranges. ie stream(array, offset, length)
author | mduigou |
---|---|
date | Fri, 14 Sep 2012 22:16:13 -0700 |
parents | b1e04d190241 |
children | 83987ec0e6f1 |
files | src/share/classes/java/util/ArrayList.java src/share/classes/java/util/Arrays.java src/share/classes/java/util/Collection.java src/share/classes/java/util/Vector.java src/share/classes/java/util/streams/Streams.java |
diffstat | 5 files changed, 172 insertions(+), 196 deletions(-) [+] |
line wrap: on
line diff
--- a/src/share/classes/java/util/ArrayList.java Fri Sep 14 22:13:04 2012 -0700 +++ b/src/share/classes/java/util/ArrayList.java Fri Sep 14 22:16:13 2012 -0700 @@ -25,6 +25,9 @@ package java.util; +import java.util.streams.Stream; +import java.util.streams.Streams; + /** * Resizable-array implementation of the <tt>List</tt> interface. Implements * all optional list operations, and permits all elements, including @@ -1128,4 +1131,9 @@ throw new ConcurrentModificationException(); } } + + @Override + public Stream<E> parallel() { + return Streams.parallel(this); + } }
--- a/src/share/classes/java/util/Arrays.java Fri Sep 14 22:13:04 2012 -0700 +++ b/src/share/classes/java/util/Arrays.java Fri Sep 14 22:16:13 2012 -0700 @@ -2903,6 +2903,11 @@ public boolean contains(Object o) { return indexOf(o) != -1; } + + @Override + public Stream<E> parallel() { + return Streams.parallel(this); + } } /** @@ -3686,188 +3691,7 @@ dejaVu.remove(a); } - private static class ArrayIterator<T> implements Iterator<T> { - protected final T[] array; - protected final int endOffset; - protected int offset; - - private ArrayIterator(T[] array, int startOffset, int len) { - this.array = Objects.requireNonNull(array); - this.endOffset = startOffset + len; - this.offset = startOffset; - - assert (offset >= 0) && ((offset < array.length) || (offset == endOffset)): "offset not in array"; - assert (endOffset >= offset) && (endOffset <= array.length) : "end not in array"; - } - - @Override - public boolean hasNext() { - return offset < endOffset; - } - - @Override - public T next() { - if(!hasNext()) { - throw new NoSuchElementException(); - } - return array[offset++]; - } - } - - private static class ArraySpliterator<T> extends ArrayIterator<T> implements Spliterator<T> { - boolean iterating = false; - - ArraySpliterator(T[] array) { - this(array, 0, array.length); - } - - ArraySpliterator(T[] array, int offset, int length) { - super(array, offset, length); - } - - @Override - public int getRemainingSizeIfKnown() { - return endOffset - offset; - } - - @Override - public boolean isPredictableSplits() { - return true; - } - - @Override - public void into(Sink<T, ?, ?> sink) { - sink.begin(getRemainingSizeIfKnown()); - if (offset < endOffset) { - // Strange-looking way to iterate; reduce heap write traffic - int wasOffset = offset; - offset = endOffset; - for (int i=wasOffset; i<endOffset; i++) - sink.accept(array[i]); - } - sink.end(); - } - - @Override - public int getNaturalSplitArity() { - return (endOffset - offset > 1) ? 1 : 0; - } - - @Override - public Spliterator<T> split() { - if (iterating) - throw new IllegalStateException("split after iterate"); - int t = (endOffset - offset) / 2; - Spliterator<T> ret = new ArraySpliterator<>(array, offset, t); - offset += t; - return ret; - } - - @Override - public Iterator<T> iterator() { - iterating = true; - return this; - } - } - - private static class ArrayStreamAccessor<T> - extends ArrayIterator<T> implements StreamAccessor<T> { - public ArrayStreamAccessor(T[] array, int startOffset, int len) { - super(array, startOffset, len); - } - - @Override - public void into(Sink<T, ?, ?> sink) { - sink.begin(endOffset-offset); - if (offset < endOffset) { - // Strange-looking way to iterate; reduce heap write traffic - int wasOffset = offset; - offset = endOffset; - for (int i=wasOffset; i<endOffset; i++) - sink.accept(array[i]); - } - sink.end(); - } - - @Override - public Iterator<T> iterator() { - return this; - } - - @Override - public int getStreamFlags() { - return Stream.FLAG_SIZED; - } - - @Override - public int getSizeOrEstimate() { - return endOffset - offset; - } - - @Override - public boolean isParallel() { - return false; - } - - @Override - public boolean isPredictableSplits() { - return false; - } - - @Override - public StreamShape getShape() { - return StreamShape.SCALAR; - } - - @Override - public Spliterator<T> spliterator() { - throw new UnsupportedOperationException(); - } - } - - private static class ArrayParallelStreamAccessor<T> - extends ArraySpliterator<T> implements StreamAccessor<T> { - private final int size; - - ArrayParallelStreamAccessor(T[] array) { - this(array, 0, array.length); - } - - ArrayParallelStreamAccessor(T[] array, int offset, int length) { - super(array, offset, length); - this.size = length; - } - - @Override - public int getStreamFlags() { - return Stream.FLAG_SIZED; - } - - @Override - public int getSizeOrEstimate() { - return size; - } - - @Override - public boolean isParallel() { - return true; - } - - @Override - public StreamShape getShape() { - return StreamShape.SCALAR; - } - - @Override - public Iterator<T> iterator() { - return this; - } - - @Override - public Spliterator<T> spliterator() { - return this; - } - } + private static class ArraySink<T> implements Sink.OfLinear<T> { private final T[] array; @@ -3904,43 +3728,43 @@ public static<T> Iterator<T> iterator(T[] array) { - return iterator(array, 0, array.length); + return iterable(array).iterator(); } public static<T> Iterator<T> iterator(T[] array, int offset, int length) { - return new ArrayIterator<>(array, offset, length); + return iterable(array,offset,length).iterator(); } public static<T> Iterable<T> iterable(T[] array) { - return iterable(array, 0, array.length); + return asList(array); } public static<T> Iterable<T> iterable(T[] array, int offset, int length) { - return () -> new ArrayIterator<>(array, offset, length); + return asList(array).subList(offset, offset + length); } public static<T> Stream<T> stream(T[] array) { - return stream(array, 0, array.length); + return Streams.stream(asList(array)); } public static<T> Stream<T> stream(T[] array, int offset, int length) { - return new LinearPipeline<>(new ArrayStreamAccessor<>(array, offset, length)); + return Streams.stream(asList(array).subList(offset, offset + length)); } - public static<T> Spliterator<T> spliterator(T[] array) { - return spliterator(array, 0, array.length); + public static <T, L extends RandomAccess & List<T>> Spliterator<T> spliterator(T[] array) { + return Streams.spliterator((L) asList(array)); } - public static<T> Spliterator<T> spliterator(T[] array, int offset, int length) { - return new ArraySpliterator<>(array, offset, length); + public static <T, L extends RandomAccess & List<T>> Spliterator<T> spliterator(T[] array, int offset, int length) { + return Streams.spliterator((L) asList(array).subList(offset, offset+length)); } - public static<T> Stream<T> parallel(T[] array) { - return parallel(array, 0, array.length); + public static <T, L extends RandomAccess & List<T>> Stream<T> parallel(T[] array) { + return Streams.parallel((L) asList(array)); } public static<T> Stream<T> parallel(T[] array, int offset, int length) { - return new LinearPipeline<>(new ArrayParallelStreamAccessor<T>(array, offset, length)); + return Streams.stream(asList(array).subList(offset, offset + length)); } public static<T> Streamable<T> asStreamable(final T[] array, final int offset, final int length) {
--- a/src/share/classes/java/util/Collection.java Fri Sep 14 22:13:04 2012 -0700 +++ b/src/share/classes/java/util/Collection.java Fri Sep 14 22:16:13 2012 -0700 @@ -491,7 +491,7 @@ @Override Stream<E> stream() default { - return Streams.stream(this, size()); + return Streams.stream(this); } @Override
--- a/src/share/classes/java/util/Vector.java Fri Sep 14 22:13:04 2012 -0700 +++ b/src/share/classes/java/util/Vector.java Fri Sep 14 22:16:13 2012 -0700 @@ -25,6 +25,9 @@ package java.util; +import java.util.streams.Stream; +import java.util.streams.Streams; + /** * The {@code Vector} class implements a growable array of * objects. Like an array, it contains components that can be @@ -1209,4 +1212,9 @@ lastRet = -1; } } + + @Override + public Stream<E> parallel() { + return Streams.parallel(this); + } }
--- a/src/share/classes/java/util/streams/Streams.java Fri Sep 14 22:13:04 2012 -0700 +++ b/src/share/classes/java/util/streams/Streams.java Fri Sep 14 22:16:13 2012 -0700 @@ -91,6 +91,14 @@ return new LinearPipeline<>(new IteratorStreamAccessor<>(source)); } + public static <T, L extends RandomAccess & List<T>> Spliterator<T> spliterator(L source) { + return new RandomAccessListSpliterator(source); + } + + public static <T, L extends RandomAccess & List<T>> Stream<T> parallel(L source) { + return new LinearPipeline<>(new RandomAccessListParallelStreamAccessor<>(source)); + } + public static<T> Stream<T> parallel(Spliterator<T> source, int sizeOrEstimate) { return new LinearPipeline<>(new SpliteratorStreamAccessor<>(source, sizeOrEstimate)); } @@ -570,4 +578,132 @@ return iterator().curValue(); } } + + private static class RandomAccessListIterator<T, L extends RandomAccess & List<T>> implements Iterator<T> { + protected final L elements; + protected final int endOffset; + protected int offset; + + private RandomAccessListIterator(L elements, int startOffset, int len) { + this.elements = Objects.requireNonNull(elements); + this.endOffset = startOffset + len; + this.offset = startOffset; + + assert (offset >= 0) && ((offset < elements.size()) || (offset == endOffset)): "offset not in array"; + assert (endOffset >= offset) && (endOffset <= elements.size()) : "end not in array"; + } + + @Override + public boolean hasNext() { + return offset < endOffset; + } + + @Override + public T next() { + if(!hasNext()) { + throw new NoSuchElementException(); + } + return elements.get(offset++); + } + } + + private static class RandomAccessListSpliterator<T, L extends RandomAccess & List<T>> extends RandomAccessListIterator<T,L> implements Spliterator<T> { + boolean iterating = false; + + RandomAccessListSpliterator(L elements) { + this(elements, 0, elements.size()); + } + + RandomAccessListSpliterator(L elements, int offset, int length) { + super(elements, offset, length); + } + + @Override + public int getRemainingSizeIfKnown() { + return endOffset - offset; + } + + @Override + public boolean isPredictableSplits() { + return true; + } + + @Override + public void into(Sink<T, ?, ?> sink) { + sink.begin(getRemainingSizeIfKnown()); + if (offset < endOffset) { + // Strange-looking way to iterate; reduce heap write traffic + int wasOffset = offset; + offset = endOffset; + for (int i=wasOffset; i<endOffset; i++) + sink.accept(elements.get(i)); + } + sink.end(); + } + + @Override + public int getNaturalSplitArity() { + return (endOffset - offset > 1) ? 1 : 0; + } + + @Override + public Spliterator<T> split() { + if (iterating) + throw new IllegalStateException("split after iterate"); + int t = (endOffset - offset) / 2; + Spliterator<T> ret = new RandomAccessListSpliterator<>(elements, offset, t); + offset += t; + return ret; + } + + @Override + public Iterator<T> iterator() { + iterating = true; + return this; + } + } + + private static class RandomAccessListParallelStreamAccessor<T,L extends RandomAccess & List<T>> + extends RandomAccessListSpliterator<T,L> implements StreamAccessor<T> { + private final int size; + + RandomAccessListParallelStreamAccessor(L elements) { + this(elements, 0, elements.size()); + } + + RandomAccessListParallelStreamAccessor(L elements, int offset, int length) { + super(elements, offset, length); + this.size = length; + } + + @Override + public int getStreamFlags() { + return Stream.FLAG_SIZED; + } + + @Override + public int getSizeOrEstimate() { + return size; + } + + @Override + public boolean isParallel() { + return true; + } + + @Override + public StreamShape getShape() { + return StreamShape.SCALAR; + } + + @Override + public Iterator<T> iterator() { + return this; + } + + @Override + public Spliterator<T> spliterator() { + return this; + } + } }