changeset 6390:bd225e0a0200

Resurrect specialized stream/parallel implementations on ARrayList/Vector
author briangoetz
date Mon, 12 Nov 2012 20:16:28 -0500
parents b3fc2ac88d4c
children 218bdab80e30
files src/share/classes/java/util/ArrayList.java src/share/classes/java/util/Vector.java src/share/classes/java/util/streams/Streams.java test-ng/tests/org/openjdk/tests/java/util/streams/ops/FindFirstOpTest.java
diffstat 4 files changed, 91 insertions(+), 243 deletions(-) [+]
line wrap: on
line diff
--- a/src/share/classes/java/util/ArrayList.java	Mon Nov 12 17:07:34 2012 -0500
+++ b/src/share/classes/java/util/ArrayList.java	Mon Nov 12 20:16:28 2012 -0500
@@ -25,6 +25,10 @@
 
 package java.util;
 
+import java.util.streams.Stream;
+import java.util.streams.StreamOpFlags;
+import java.util.streams.Streams;
+
 /**
  * Resizable-array implementation of the <tt>List</tt> interface.  Implements
  * all optional list operations, and permits all elements, including
@@ -1129,21 +1133,15 @@
         }
     }
 
+    @Override
+    public Stream<E> stream() {
+        return Streams.stream(() -> Streams.spliterator((E[]) elementData, 0, size),
+                              StreamOpFlags.IS_ORDERED | StreamOpFlags.IS_SIZED);
+    }
 
-//    private class ArrayProxyImpl<E> implements Streams.ArrayProxy<E> {
-//        public E[] getArray() { return (E[]) ArrayList.this.elementData; }
-//        public int getOffset() { return 0; }
-//        public int getLen() { return ArrayList.this.size; }
-//        public int getModCount() { return ArrayList.this.modCount;  }
-//    }
-//
-//    @Override
-//    public Stream<E> stream() {
-//        return Streams.stream(new ArrayProxyImpl<E>());
-//    }
-//
-//    @Override
-//    public Stream<E> parallel() {
-//        return Streams.parallel(new ArrayProxyImpl<E>());
-//    }
+    @Override
+    public Stream<E> parallel() {
+        return Streams.parallel(() -> Streams.spliterator((E[]) elementData, 0, size),
+                                StreamOpFlags.IS_ORDERED | StreamOpFlags.IS_SIZED);
+    }
 }
--- a/src/share/classes/java/util/Vector.java	Mon Nov 12 17:07:34 2012 -0500
+++ b/src/share/classes/java/util/Vector.java	Mon Nov 12 20:16:28 2012 -0500
@@ -26,6 +26,7 @@
 package java.util;
 
 import java.util.streams.Stream;
+import java.util.streams.StreamOpFlags;
 import java.util.streams.Streams;
 
 /**
@@ -1213,20 +1214,15 @@
         }
     }
 
-//    private class ArrayProxyImpl<E> implements Streams.ArrayProxy<E> {
-//        public E[] getArray() { return (E[]) Vector.this.elementData; }
-//        public int getOffset() { return 0; }
-//        public int getLen() { return Vector.this.elementCount; }
-//        public int getModCount() { return Vector.this.modCount;  }
-//    }
-//
-//    @Override
-//    public Stream<E> stream() {
-//        return Streams.stream(new ArrayProxyImpl<E>());
-//    }
-//
-//    @Override
-//    public Stream<E> parallel() {
-//        return Streams.parallel(new ArrayProxyImpl<E>());
-//    }
+    @Override
+    public Stream<E> stream() {
+        return Streams.stream(() -> Streams.spliterator((E[]) elementData, 0, elementCount),
+                              StreamOpFlags.IS_ORDERED | StreamOpFlags.IS_SIZED);
+    }
+
+    @Override
+    public Stream<E> parallel() {
+        return Streams.parallel(() -> Streams.spliterator((E[]) elementData, 0, elementCount),
+                                StreamOpFlags.IS_ORDERED | StreamOpFlags.IS_SIZED);
+    }
 }
--- a/src/share/classes/java/util/streams/Streams.java	Mon Nov 12 17:07:34 2012 -0500
+++ b/src/share/classes/java/util/streams/Streams.java	Mon Nov 12 20:16:28 2012 -0500
@@ -44,7 +44,7 @@
     // Stream
 
     public static<U, T extends Sized & Iterable<U>> Stream<U> stream(T entity, int flags) {
-        return new ReferencePipeline<>(new Spliterator.Sequential<U>() {
+        return stream(new Spliterator.Sequential<U>() {
             @Override
             public Iterator<U> iterator() {
                 return entity.iterator();
@@ -63,7 +63,7 @@
     }
 
     public static<U, T extends Iterable<U>> Stream<U> stream(T entity, Sized sizeProvider, int flags) {
-        return new ReferencePipeline<>(new Spliterator.Sequential<U>() {
+        return stream(new Spliterator.Sequential<U>() {
             @Override
             public Iterator<U> iterator() {
                 return entity.iterator();
@@ -82,7 +82,7 @@
     }
 
     public static<U, T extends Iterable<U>> Stream<U> stream(T entity, int flags) {
-        return new ReferencePipeline<>(new Spliterator.Sequential<U>() {
+        return stream(new Spliterator.Sequential<U>() {
             @Override
             public Iterator<U> iterator() {
                 return entity.iterator();
@@ -97,7 +97,7 @@
     }
 
     public static<U, T extends Iterator<U>> Stream<U> stream(T iterator, int flags) {
-        return new ReferencePipeline<>(new Spliterator.Sequential<U>() {
+        return stream(new Spliterator.Sequential<U>() {
             @Override
             public Iterator<U> iterator() {
                 return iterator;
@@ -112,14 +112,12 @@
     }
 
     public static<U> Stream<U> stream(Spliterator.Sequential<U> spliterator, int flags) {
-        if (spliterator.getSizeIfKnown() >= 0)
-            flags |= StreamOpFlags.IS_SIZED;
         return new ReferencePipeline<>(spliterator, flags);
     }
 
-//    public static <T> Stream<T> stream(ArrayProxy<T> proxy) {
-//        return new ReferencePipeline<>(new ArrayProxyStreamAccessor<>(proxy));
-//    }
+    public static <T> Stream<T> stream(Factory<Spliterator<T>> proxy, int flags) {
+        return new ReferencePipeline<>(new ProxySpliterator<>(proxy), flags);
+    }
 
     public static <T> Spliterator<T> spliterator(T[] source) {
         return spliterator(source, 0, source.length);
@@ -135,8 +133,8 @@
 
     public static <T> Stream<T> stream(T[] source, int offset, int length) {
         // Note use of full-service Spliterator here -- harmless because PARALLEL flag is not set
-        return new ReferencePipeline<>(new ArraySpliterator<>(source, offset, length),
-                                   StreamOpFlags.IS_SIZED | StreamOpFlags.IS_ORDERED);
+        return stream(new ArraySpliterator<>(source, offset, length),
+                      StreamOpFlags.IS_SIZED | StreamOpFlags.IS_ORDERED);
     }
 
     public static <T> Stream<T> parallel(T[] source) {
@@ -144,8 +142,8 @@
     }
 
     public static <T> Stream<T> parallel(T[] source, int offset, int length) {
-        return new ReferencePipeline<>(new ArraySpliterator<>(source, offset, length),
-                                   StreamOpFlags.IS_SIZED | StreamOpFlags.IS_ORDERED | StreamOpFlags.IS_PARALLEL);
+        return stream(new ArraySpliterator<>(source, offset, length),
+                      StreamOpFlags.IS_SIZED | StreamOpFlags.IS_ORDERED | StreamOpFlags.IS_PARALLEL);
     }
 
     public static<T> Stream<T> parallel(Spliterator<T> spliterator, int flags) {
@@ -154,27 +152,15 @@
         return new ReferencePipeline<>(spliterator, flags | StreamOpFlags.IS_PARALLEL);
     }
 
-//    public static <T> Stream<T> parallel(ArrayProxy<T> proxy) {
-//        return new ReferencePipeline<>(new ArrayProxyParallelStreamAccessor<>(proxy));
-//    }
+    public static <T> Stream<T> parallel(Factory<Spliterator<T>> proxy, int flags) {
+        return new ReferencePipeline<>(new ProxySpliterator<>(proxy), flags | StreamOpFlags.IS_PARALLEL);
+    }
 
     @SuppressWarnings("unchecked")
     public static<T> Stream<T> emptyStream() {
         return EMPTY_STREAM;
     }
 
-    /**
-     * A small interface that defers getting of array data.
-     *
-     * @param <T> type of elements.
-     */
-//    public interface ArrayProxy<T> {
-//        int getModCount();
-//        T[] getArray();
-//        int getOffset();
-//        int getLen();
-//    }
-
     // Infinite streams
 
     public static<T> Stream<T> iterate(final T seed, final UnaryOperator<T> f) {
@@ -262,9 +248,6 @@
         return stream(cycle, StreamOpFlags.IS_ORDERED);
     }
 
-    // @@@ Need from(StreamAccessor) methods
-
-
     private static final Spliterator<?> EMPTY_SPLITERATOR = new Spliterator<Object>() {
             @Override
             public int getNaturalSplits() {
@@ -337,186 +320,6 @@
         }
     }
 
-//    private static class ArrayProxyIterator<T> implements Iterator<T> {
-//        protected final ArrayProxy<T> proxy;
-//        protected int expectedmodcount = -1;
-//        protected int offset = -1;
-//        protected int endOffset = -1;
-//
-//        private ArrayProxyIterator(ArrayProxy<T> proxy) {
-//            this.proxy = proxy;
-//        }
-//
-//        protected void comodificationCheck() {
-//            if(expectedmodcount == proxy.getModCount()) {
-//                return;
-//            }
-//
-//            if(-1 != expectedmodcount) {
-//                throw new ConcurrentModificationException();
-//            }
-//
-//            // initalize offset, end offset and leng
-//            expectedmodcount = proxy.getModCount();
-//            offset = proxy.getOffset();
-//            int len = proxy.getLen();
-//            assert len >= 0 : "invalid length";
-//
-//            endOffset = offset + len;
-//            assert (offset >= 0) && (offset <= endOffset) : "offset not in array";
-//            assert (offset <= proxy.getArray().length) && (endOffset <= proxy.getArray().length);
-//        }
-//
-//        @Override
-//        public boolean hasNext() {
-//            comodificationCheck();
-//
-//            return offset < endOffset;
-//        }
-//
-//        @Override
-//        public T next() {
-//            if(!hasNext()) {
-//                throw new NoSuchElementException();
-//            }
-//
-//            return proxy.getArray()[offset++];
-//        }
-//
-//        public void forEach(Block<? super T> sink) {
-//            comodificationCheck();
-//
-//            T[] elements = proxy.getArray();
-//
-//            for (int i=offset; i<endOffset; i++) {
-//                sink.apply(elements[i]);
-//            }
-//            // update only once; reduce heap write traffic
-//            offset = endOffset;
-//
-//            // too late to be useful but may find errors.
-//            comodificationCheck();
-//        }
-//    }
-
-//    private static class ArrayProxyStreamAccessor<T>
-//            extends ArrayProxyIterator<T> implements StreamAccessor.ForSequential<T> {
-//
-//        ArrayProxyStreamAccessor(ArrayProxy<T> proxy) {
-//            super(proxy);
-//        }
-//
-//        @Override
-//        public int getStreamFlags() {
-//            return StreamOpFlags.IS_SIZED | StreamOpFlags.IS_ORDERED;
-//        }
-//
-//        @Override
-//        public int getSizeIfKnown() {
-//            comodificationCheck();
-//
-//            return endOffset - offset;
-//        }
-//
-//        @Override
-//        public Iterator<T> iterator() {
-//            return this;
-//        }
-//    }
-
-//    private static class ArrayProxySpliterator<T> extends ArrayProxyStreamAccessor<T> implements Spliterator<T> {
-//        boolean traversing = false;
-//
-//        ArrayProxySpliterator(ArrayProxy<T> proxy) {
-//            super(proxy);
-//        }
-//
-//        /**
-//         * Constructor for sub-splits.
-//         *
-//         * @param proxy
-//         * @param offset offset of sub-split.
-//         * @param len
-//         */
-//        ArrayProxySpliterator(ArrayProxy<T> proxy, int offset, int len) {
-//            super(proxy);
-//            this.offset = offset;
-//            this.endOffset = offset + len;
-//            this.expectedmodcount = proxy.getModCount();
-//        }
-//
-//        @Override
-//        public int getSizeIfKnown() {
-//            comodificationCheck();
-//
-//            return endOffset - offset;
-//        }
-//
-//        @Override
-//        public int estimateSize() {
-//            return getSizeIfKnown();
-//        }
-//
-//        @Override
-//        public boolean isPredictableSplits() {
-//            return true;
-//        }
-//
-//        @Override
-//        public int getNaturalSplits() {
-//            comodificationCheck();
-//
-//            return (endOffset - offset > 1) && !traversing ? 1 : 0;
-//        }
-//
-//        @Override
-//        public Spliterator<T> split() {
-//            comodificationCheck();
-//
-//            if (traversing) {
-//                throw new IllegalStateException("split after starting traversal");
-//            }
-//
-//            int t = (endOffset - offset) / 2;
-//            ArrayProxySpliterator<T> ret = new ArrayProxySpliterator<>(proxy, offset, t);
-//            if(ret.expectedmodcount != expectedmodcount) {
-//                throw new ConcurrentModificationException();
-//            }
-//            offset += t;
-//            return ret;
-//        }
-//
-//        @Override
-//        public Iterator<T> iterator() {
-//            traversing = true;
-//            return this;
-//        }
-//
-//        @Override
-//        public void forEach(Block<? super T> block) {
-//           traversing = true;
-//           super.forEach(block);
-//        }
-//    }
-
-//    private static class ArrayProxyParallelStreamAccessor<T> extends ArrayProxySpliterator<T> {
-//
-//        ArrayProxyParallelStreamAccessor(ArrayProxy<T> proxy) {
-//            super(proxy);
-//        }
-//
-//        @Override
-//        public boolean isParallel() {
-//            return true;
-//        }
-//
-//        @Override
-//        public Spliterator<T> spliterator() {
-//            comodificationCheck();
-//            return this;
-//        }
-//    }
-
     private static class ArraySpliterator<T> extends ArrayIterator<T> implements Spliterator<T> {
         boolean traversing = false;
 
@@ -575,4 +378,54 @@
             return this;
         }
     }
+
+    private static class ProxySpliterator<T> implements Spliterator<T> {
+        private final Factory<Spliterator<T>> factory;
+        private Spliterator<T> spliterator;
+
+        private ProxySpliterator(Factory<Spliterator<T>> factory) {
+            this.factory = factory;
+        }
+
+        private Spliterator<T> spliterator() {
+            if (spliterator == null)
+                spliterator = factory.make();
+            return spliterator;
+        }
+
+        @Override
+        public int getNaturalSplits() {
+            return spliterator().getNaturalSplits();
+        }
+
+        @Override
+        public Spliterator<T> split() {
+            return spliterator().split();
+        }
+
+        @Override
+        public Iterator<T> iterator() {
+            return spliterator().iterator();
+        }
+
+        @Override
+        public void forEach(Block<? super T> block) {
+            spliterator().forEach(block);
+        }
+
+        @Override
+        public int getSizeIfKnown() {
+            return spliterator().getSizeIfKnown();
+        }
+
+        @Override
+        public int estimateSize() {
+            return spliterator().estimateSize();
+        }
+
+        @Override
+        public boolean isPredictableSplits() {
+            return spliterator().isPredictableSplits();
+        }
+    }
 }
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/FindFirstOpTest.java	Mon Nov 12 17:07:34 2012 -0500
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/FindFirstOpTest.java	Mon Nov 12 20:16:28 2012 -0500
@@ -56,6 +56,7 @@
     public void testFindFirstParallel() {
         assertFalse(Collections.<Integer>emptySet().parallel().findFirst().isPresent(), "no result");
         assertFalse(countTo(1000).parallel().filter(x -> x > 1000).findFirst().isPresent(), "no result");
+        assertEquals(999, (int) countTo(1000).parallel().filter(x -> x == 999).findFirst().get(), "999 is present");
         assertEquals(2, (int) countTo(1000).parallel().filter(pEven).findFirst().get(), "first even number is 2");
     }