changeset 6061:f673f3d4682f

Replacement for VariableStreamBuilder that doesn't do re-allocs. Better chunk sizing will come in future updates.
author mduigou
date Thu, 04 Oct 2012 18:26:03 -0700
parents 3761ee74342c
children f847b6e6c00b
files src/share/classes/java/util/streams/StreamBuilders.java test-ng/tests/org/openjdk/tests/java/util/streams/ops/StreamOpTestCase.java
diffstat 2 files changed, 194 insertions(+), 20 deletions(-) [+]
line wrap: on
line diff
--- a/src/share/classes/java/util/streams/StreamBuilders.java	Thu Oct 04 15:34:07 2012 -0700
+++ b/src/share/classes/java/util/streams/StreamBuilders.java	Thu Oct 04 18:26:03 2012 -0700
@@ -26,26 +26,29 @@
 
 import java.util.*;
 import java.util.functions.Block;
+import java.util.logging.Logger;
 
 /**
  * Utilities for building streams.
- *
+ * <p/>
  * @author Brian Goetz
  */
 public class StreamBuilders {
     // No instances
-    private StreamBuilders() { }
 
-    public static<T> StreamBuilder<T> makeFixed(int size) {
+    private StreamBuilders() {
+    }
+
+    public static <T> StreamBuilder<T> makeFixed(int size) {
         return new FixedStreamBuilder<>(size);
     }
 
-    public static<T> StreamBuilder<T> make() {
-        return new VariableStreamBuilder<>();
+    public static <T> StreamBuilder<T> make() {
+        return new SimpleSpinedStreamBuilder<>();
     }
 
-    public static<T> StreamBuilder<T> make(int initialSize) {
-        return new VariableStreamBuilder<>(initialSize);
+    public static <T> StreamBuilder<T> make(int initialSize) {
+        return new SimpleSpinedStreamBuilder<>(initialSize);
     }
 
     private static boolean equals(StreamBuilder a, StreamBuilder b) {
@@ -62,27 +65,28 @@
         return true;
     }
 
-    private static<T> int hashCode(StreamBuilder<T> sb) {
+    private static <T> int hashCode(StreamBuilder<T> sb) {
         int h = 0;
         for (T t : sb) {
-            h = 31 * h + t.hashCode();
+            h = 31 * h + Objects.hashCode(t);
         }
         return h;
     }
 
     private static class FixedStreamBuilder<T> extends AbstractCollection<T> implements StreamBuilder<T> {
+
         private final T[] array;
         private int curSize;
 
         public FixedStreamBuilder(int size) {
-            array = (T[]) new Object[size];
+            array = (T[])new Object[size];
         }
 
         @Override
         public void begin(int size) {
             curSize = 0;
-            if(size > array.length) {
-                System.out.println("Estimate greater than length. There might be blood.");
+            if (size > array.length) {
+                Logger.getLogger(StreamBuilders.class.getName()).warning("Estimate greater than length. There might be blood.");
             }
         }
 
@@ -97,7 +101,7 @@
 
         @Override
         public void forEach(Block<? super T> block) {
-            for (int i=0; i<curSize; i++) {
+            for (int i = 0; i < curSize; i++) {
                 block.apply(array[i]);
             }
         }
@@ -126,8 +130,7 @@
         public Object[] toArray() {
             if (curSize == array.length) {
                 return array;
-            }
-            else {
+            } else {
                 // @@@ Should this throw ISE instead?
                 return Arrays.copyOf(array, curSize);
             }
@@ -142,17 +145,189 @@
         public boolean equals(Object obj) {
             if (!(obj instanceof StreamBuilder))
                 return false;
-            return StreamBuilders.equals(this, (StreamBuilder) obj);
+            return StreamBuilders.equals(this, (StreamBuilder)obj);
         }
 
         @Override
         public String toString() {
-            return String.format("FixedStreamBuilder[%d][%s]", array.length-curSize, Arrays.toString(array));
+            return String.format("FixedStreamBuilder[%d][%s]", array.length - curSize, Arrays.toString(array));
+        }
+    }
+
+    private static class SimpleSpinedStreamBuilder<T> extends AbstractCollection<T> implements StreamBuilder<T> {
+
+        private final static int CHUNKS_SIZE = 32;
+        private final static int CHUNK_SIZE = 1024;
+        private T[][] chunks = (T[][])new Object[CHUNKS_SIZE][];
+        private int size = 0;
+        private int capacity = 0;
+
+        private SimpleSpinedStreamBuilder(int initialSize) {
+            ensureCapacity(initialSize);
+        }
+
+        private SimpleSpinedStreamBuilder() {
+            this(16);
+        }
+
+        private void ensureCapacity(int capacity) {
+            while (this.capacity < capacity) {
+                int chunk = this.capacity / CHUNK_SIZE;
+                if (chunk >= chunks.length) {
+                    chunks = Arrays.copyOf(chunks, chunks.length + CHUNKS_SIZE);
+                }
+
+                chunks[chunk] = (T[])new Object[CHUNK_SIZE];
+                this.capacity += CHUNK_SIZE;
+            }
+        }
+
+        @Override
+        public void begin(int size) {
+            clear();
+            if (size >= 0) {
+                ensureCapacity(size);
+            }
+        }
+
+        @Override
+        public void accept(T t) {
+            int chunk = size / CHUNK_SIZE;
+            int inChunk = size % CHUNK_SIZE;
+            if (0 == inChunk) {
+                ensureCapacity(size + 1);
+            }
+
+            chunks[chunk][inChunk] = t;
+            size++;
+        }
+
+        @Override
+        public int size() {
+            return size;
+        }
+
+        @Override
+        public Stream<T> stream() {
+            return Streams.stream(this, size());
+        }
+
+        @Override
+        public void forEach(Block<? super T> block) {
+            int finalChunk = size / CHUNK_SIZE;
+            int finalIndex = size % CHUNK_SIZE;
+
+            // full chunks
+            int chunk = 0;
+            for (chunk = 0; chunk < finalChunk; chunk++) {
+                for (T t : chunks[chunk]) {
+                    block.apply(t);
+                }
+            }
+
+            // final chunk.
+            T[] partialChunk = chunks[chunk];
+            for (int index = 0; index < finalIndex; index++) {
+                block.apply(partialChunk[index]);
+            }
+        }
+
+        @Override
+        public Iterator<T> iterator() {
+            Iterator<T> result = (Iterator<T>) new Iterator() {
+                final int finalChunk = size / CHUNK_SIZE;
+                int chunk = -1;
+                int inChunk;
+                int nextChunk;
+
+                @Override
+                public boolean hasNext() {
+                    return (chunk < finalChunk) || (finalChunk == chunk && inChunk < nextChunk);
+                }
+
+                @Override
+                public T next() {
+                    if (!hasNext()) {
+                        throw new NoSuchElementException();
+                    }
+
+                    T result = chunks[chunk][inChunk++];
+
+                    if (inChunk == nextChunk) {
+                        advanceChunk();
+                    }
+
+                    return result;
+                }
+
+                private void advanceChunk() {
+                    chunk++;
+                    inChunk = 0;
+                    nextChunk += (finalChunk == chunk)
+                            ? size % CHUNK_SIZE
+                            : CHUNK_SIZE;
+                }
+
+                {
+                    advanceChunk();
+                }
+            };
+
+            return result;
+        }
+
+        @Override
+        public void clear() {
+            size = 0;
+        }
+
+        @Override
+        public Object[] toArray() {
+            Object[] result = new Object[size];
+
+            int finalChunk = size / CHUNK_SIZE;
+            int finalIndex = size % CHUNK_SIZE;
+            int index = 0;
+            int chunk = 0;
+
+            // full chunks
+            while (chunk < finalChunk) {
+                System.arraycopy(chunks[chunk], 0, result, index, CHUNK_SIZE);
+                index += CHUNK_SIZE;
+            }
+
+            // final bit.
+            System.arraycopy(chunks[chunk], 0, result, index, finalIndex);
+
+            return result;
+        }
+
+        @Override
+        public int hashCode() {
+            return StreamBuilders.hashCode(this);
+        }
+
+        @Override
+        public String toString() {
+            return String.format("SimpleSpinedStreamBuilder[%s]", Arrays.toString(toArray()));
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (!(obj instanceof StreamBuilder))
+                return false;
+            return StreamBuilders.equals(this, (StreamBuilder)obj);
+        }
+
+        @Override
+        public boolean isEmpty() {
+            return size != 0;
         }
     }
 
     private static class VariableStreamBuilder<T> extends AbstractCollection<T> implements StreamBuilder<T> {
         // Junky initial implementation that involves excessive copying.
+
         private final ArrayList<T> list;
 
         private VariableStreamBuilder(int initialSize) {
@@ -166,7 +341,7 @@
         @Override
         public void begin(int size) {
             list.clear();
-            if(size >= 0) {
+            if (size >= 0) {
                 list.ensureCapacity(size);
             }
         }
@@ -220,7 +395,7 @@
         public boolean equals(Object obj) {
             if (!(obj instanceof StreamBuilder))
                 return false;
-            return StreamBuilders.equals(this, (StreamBuilder) obj);
+            return StreamBuilders.equals(this, (StreamBuilder)obj);
         }
     }
 }
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/StreamOpTestCase.java	Thu Oct 04 15:34:07 2012 -0700
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/StreamOpTestCase.java	Thu Oct 04 18:26:03 2012 -0700
@@ -248,7 +248,6 @@
         return refResult;
     }
 
-    @SuppressWarnings("unchecked")
     protected static<T> void assertMatches(StreamResult<T> refResult, Block<Sink<T,?,?>> block) {
         assertMatches(refResult, block, null);
     }