changeset 6050:93fcce3ec005

adds sized flag for SortedOp output. docs improvements, testing and diagnostics improvements.
author mduigou
date Tue, 25 Sep 2012 14:04:47 -0700
parents 1c856759159e
children 7fec616fd4eb
files src/share/classes/java/util/streams/BaseStream.java src/share/classes/java/util/streams/MapStreamAccessor.java src/share/classes/java/util/streams/StreamAccessor.java src/share/classes/java/util/streams/Streamable.java src/share/classes/java/util/streams/ops/SortedOp.java test-ng/tests/org/openjdk/tests/java/util/streams/StreamTestDataProvider.java test-ng/tests/org/openjdk/tests/java/util/streams/ops/FlatMapOpTest.java test-ng/tests/org/openjdk/tests/java/util/streams/ops/StreamOpTestCase.java
diffstat 8 files changed, 96 insertions(+), 45 deletions(-) [+]
line wrap: on
line diff
--- a/src/share/classes/java/util/streams/BaseStream.java	Mon Sep 24 15:10:48 2012 -0700
+++ b/src/share/classes/java/util/streams/BaseStream.java	Tue Sep 25 14:04:47 2012 -0700
@@ -29,19 +29,33 @@
 /**
  * BaseStream
  *
+ * @param <T_ELT> Type of stream elements.
+ * @param <T_ITER> Type of stream element iterator.
+ * 
  * @author Brian Goetz
  */
 public interface BaseStream<T_ELT, T_ITER extends Iterator<T_ELT>> {
     /**
      * Return the iterator for the elements of this stream. The same iterator
-     * instance is returned for every invocation.  Once the elements of the stream are
-     * consumed it is not possible to "rewind" the stream.
+     * instance is returned for every invocation.  Once the elements of the
+     * stream are consumed it is not possible to "rewind" the stream.
      *
      * @return the element iterator for this stream.
      */
     T_ITER iterator();
 
+    /**
+     * Returns {@code true} if this stream may be split for parallel
+     * processing.
+     *
+     * @return {@code true} if this stream may be split for parallel processing.
+     */
     boolean isParallel();
 
+    /**
+     * Returns the shape of elements of this stream.
+     *
+     * @return the shape of elements of this stream.
+     */
     StreamShape getShape();
 }
--- a/src/share/classes/java/util/streams/MapStreamAccessor.java	Mon Sep 24 15:10:48 2012 -0700
+++ b/src/share/classes/java/util/streams/MapStreamAccessor.java	Tue Sep 25 14:04:47 2012 -0700
@@ -30,7 +30,10 @@
 
 /**
  * MapStreamAccessor
- * <p/>
+ *
+ * @param <K> Type of element keys.
+ * @param <V> Type of element values.
+ *
  * @author Brian Goetz
  */
 public interface MapStreamAccessor<K, V> extends StreamAccessor<Mapping<K, V>>, MapIterator<K, V> {
--- a/src/share/classes/java/util/streams/StreamAccessor.java	Mon Sep 24 15:10:48 2012 -0700
+++ b/src/share/classes/java/util/streams/StreamAccessor.java	Tue Sep 25 14:04:47 2012 -0700
@@ -34,8 +34,20 @@
  * @author Brian Goetz
  */
 public interface StreamAccessor<T> {
+    /**
+     * Provides any remaining elements into the provided sink.
+     *
+     * @param sink The sink to which elements will be provided.
+     */
     void into(Sink<T, ?, ?> sink);
 
+    /**
+     * Return the iterator for the elements of this stream. The same iterator
+     * instance is returned for every invocation.  Once the elements of the
+     * stream are consumed it is not possible to "rewind" the stream.
+     *
+     * @return the element iterator for this stream.
+     */
     Iterator<T> iterator();
 
     public int getStreamFlags();
--- a/src/share/classes/java/util/streams/Streamable.java	Mon Sep 24 15:10:48 2012 -0700
+++ b/src/share/classes/java/util/streams/Streamable.java	Tue Sep 25 14:04:47 2012 -0700
@@ -25,13 +25,13 @@
 package java.util.streams;
 
 /**
- * Streamable
+ * Provides elements as a stream.
  *
- * @param <T> Type of stream elements.
+ * @param <S> Type of stream.
  *
  * @author Brian Goetz
  */
-public interface Streamable<T extends BaseStream> {
-    T stream();
-    T parallel() default { return stream(); }
+public interface Streamable<S extends BaseStream> {
+    S stream();
+    S parallel() default { return stream(); }
 }
--- a/src/share/classes/java/util/streams/ops/SortedOp.java	Mon Sep 24 15:10:48 2012 -0700
+++ b/src/share/classes/java/util/streams/ops/SortedOp.java	Tue Sep 25 14:04:47 2012 -0700
@@ -69,11 +69,11 @@
             @Override
             public void begin(int size) {
                 pq = new PriorityQueue<>(size > 0 ? size : DEFAULT_PRIORITY_QUEUE_SIZE, comparator);
-                downstream.begin(size);
             }
 
             @Override
             public void end() {
+                downstream.begin(pq.size());
                 while (!pq.isEmpty())
                     downstream.accept(pq.remove());
                 downstream.end();
@@ -114,6 +114,6 @@
 
     @Override
     public int getStreamFlags(int upstreamFlags) {
-        return upstreamFlags | Stream.FLAG_SORTED;
+        return upstreamFlags | Stream.FLAG_SORTED | Stream.FLAG_SIZED;
     }
 }
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/StreamTestDataProvider.java	Mon Sep 24 15:10:48 2012 -0700
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/StreamTestDataProvider.java	Tue Sep 25 14:04:47 2012 -0700
@@ -69,7 +69,7 @@
         }
     }
 
-    static Object[][] arrays = {
+    static final Object[][] arrays = {
             {"empty", to0},
             {"0..1", to1},
             {"0..10", to10},
@@ -84,10 +84,11 @@
     static {
         List<Object[]> list = new ArrayList<>();
         for (Object[] data : arrays) {
-            list.add(new Object[] { "array:" + data[0], new StreamOpTestCase.ArrayTestData<>((Integer[]) data[1])});
-            list.add(new Object[] { "ArrayList:" + data[0], new StreamOpTestCase.CollectionTestData<>(new ArrayList<>(Arrays.asList((Integer[]) data[1]))) });
-            list.add(new Object[] { "HashSet:" + data[0], new StreamOpTestCase.CollectionTestData<>(new HashSet<>(Arrays.asList((Integer[]) data[1]))) });
-            list.add(new Object[] { "TreeSet:" + data[0], new StreamOpTestCase.CollectionTestData<>(new TreeSet<>(Arrays.asList((Integer[]) data[1]))) });
+            list.add(new Object[] { "array:" + data[0], new StreamOpTestCase.ArrayTestData<>("array:" + data[0], (Integer[]) data[1])});
+            list.add(new Object[] { "Arrays.asList:" + data[0], new StreamOpTestCase.CollectionTestData<>("Arrays.asList:" + data[0], Arrays.asList((Integer[]) data[1])) });
+            list.add(new Object[] { "ArrayList:" + data[0], new StreamOpTestCase.CollectionTestData<>("ArrayList:" + data[0], new ArrayList<>(Arrays.asList((Integer[]) data[1]))) });
+            list.add(new Object[] { "HashSet:" + data[0], new StreamOpTestCase.CollectionTestData<>("HashSet:" + data[0], new HashSet<>(Arrays.asList((Integer[]) data[1]))) });
+            list.add(new Object[] { "TreeSet:" + data[0], new StreamOpTestCase.CollectionTestData<>("TreeSet:" + data[0], new TreeSet<>(Arrays.asList((Integer[]) data[1]))) });
         }
         testData = list.toArray(new Object[0][]);
     }
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/FlatMapOpTest.java	Mon Sep 24 15:10:48 2012 -0700
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/FlatMapOpTest.java	Tue Sep 25 14:04:47 2012 -0700
@@ -66,8 +66,8 @@
         assertCountSum(countTo(10).stream().flatMap(mfNull), 0, 0);
         assertCountSum(countTo(3).stream().flatMap(mfLt), 6, 4);
 
-        exerciseOps(new ArrayTestData<>(stringsArray), new FlatMapOp<>(flattenChars));
-        exerciseOps(new ArrayTestData<>(new String[]{LONG_STRING}), new FlatMapOp<>(flattenChars));
+        exerciseOps(new ArrayTestData<>("stringsArray", stringsArray), new FlatMapOp<>(flattenChars));
+        exerciseOps(new ArrayTestData<>("LONG_STRING", new String[]{LONG_STRING}), new FlatMapOp<>(flattenChars));
     }
 
     @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/StreamOpTestCase.java	Mon Sep 24 15:10:48 2012 -0700
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/StreamOpTestCase.java	Tue Sep 25 14:04:47 2012 -0700
@@ -69,7 +69,7 @@
 
             <T> void run(TestData<T> data, Sink sink, IntermediateOp[] ops) {
                 Sink<T, ?, ?> wrapped = sink(sink, ops);
-                wrapped.begin(-1);
+                wrapped.begin(data.size());
                 data.forEach(wrapped);
                 wrapped.end();
             }
@@ -227,7 +227,7 @@
             IntermediateOp[] ops = fops.make();
             if (test.isApplicable(ops)) {
                 before.apply(data);
-                assertMatches(refResult, sink -> test.run(data, sink, ops));
+                assertMatches(refResult, sink -> test.run(data, sink, ops), data.toString() + " " + test + " " + Arrays.toString(ops) );
                 after.apply(data);
             }
         }
@@ -235,11 +235,19 @@
         return refResult;
     }
 
-    @SuppressWarnings("rawtypes")
-    protected static<T> void assertMatches(StreamResult<T> refResult, Block<Sink> block) {
-        StreamResult<T> newResult = new StreamResult<>();
+    @SuppressWarnings("unchecked")
+    protected static<T> void assertMatches(StreamResult<T> refResult, Block<Sink<T,?,?>> block) {
+        assertMatches(refResult, block, null);
+    }
+
+    @SuppressWarnings("unchecked")
+    private static<T> void assertMatches(StreamResult<T> refResult, Block<Sink<T,?,?>> block, String message) {
+        StreamResult<T> newResult = new StreamResult<>(refResult.size());
         block.apply(newResult);
-        assertEquals(refResult, newResult);
+        if(!Objects.equals(refResult, newResult)) {
+            System.out.printf("msg: %s\nref: %s\nnew: %s\n", message, refResult, newResult);
+        }
+        assertEquals(refResult, newResult, message);
     }
 
     @SuppressWarnings("rawtypes")
@@ -356,23 +364,6 @@
         return answer;
     }
 
-    protected static <U> void assertEquals(StreamResult<U> sink1, StreamResult<U> sink2) {
-        try {
-            assertEquals(sink1.size(), sink2.size());
-            Iterator<U> it1 = sink1.iterator();
-            Iterator<U> it2 = sink2.iterator();
-            while (it1.hasNext()) {
-                assertTrue(it2.hasNext());
-                assertEquals(it1.next(), it2.next());
-            }
-            assertFalse(it2.hasNext());
-        }
-        catch (AssertionError e) {
-            System.out.printf("Expected %s, found %s%n", sink1, sink2);
-            throw e;
-        }
-    }
-
     static class StreamResult<T> implements Sink.OfValue<T>, Traversable<T>, Sized {
 
         private T[] array;
@@ -384,6 +375,31 @@
             array = (T[]) new Object[initialSize];
         }
 
+        @Override
+        public boolean equals(Object other) {
+            if(other instanceof Sized) {
+                if(((Sized) other).size() == size()) {
+                    if(other instanceof Iterable) {
+                        @SuppressWarnings("unchecked")
+                        Iterator<T> eachOther = ((Iterable<T>) other).iterator();
+                        for (int i = 0; i < offset; i++) {
+                            if(!eachOther.hasNext()) {
+                                return false;
+                            }
+
+                            if(!Objects.equals(eachOther.next(), array[i])) {
+                                return false;
+                            }
+                        }
+
+                        return !eachOther.hasNext();
+                    }
+                }
+            }
+
+            return false;
+        }
+
         public StreamResult() {
             this(16);
         }
@@ -405,6 +421,7 @@
             if (offset == array.length) {
                 array = Arrays.copyOf(array, Math.max(array.length, 1) * 2);
             }
+            System.out.flush();
             array[offset++] = t;
         }
 
@@ -415,7 +432,7 @@
 
         @Override
         public String toString() {
-            return String.format("%s[%d](%s)", super.toString(), offset, Arrays.toString(array));
+            return String.format("%s[%d](%s)", getClass().getSimpleName(), offset, Arrays.asList(array).subList(0,offset).toString());
         }
 
         public Object[] toArray() {
@@ -586,9 +603,11 @@
     }
 
     public static class ArrayTestData<T> implements TestData<T> {
+        private final String name;
         private final T[] array;
 
-        public ArrayTestData(T[] array) {
+        public ArrayTestData(String name, T[] array) {
+            this.name = name;
             this.array = array;
         }
 
@@ -627,14 +646,16 @@
 
         @Override
         public String toString() {
-            return Arrays.toString(array);
+            return name;
         }
     }
 
     public static class CollectionTestData<T> implements TestData<T> {
+        private final String name;
         private final Collection<T> collection;
 
-        public CollectionTestData(Collection<T> collection) {
+        public CollectionTestData(String name, Collection<T> collection) {
+            this.name = name;
             this.collection = collection;
         }
 
@@ -674,7 +695,7 @@
 
         @Override
         public String toString() {
-            return collection.toString();
+            return name;
         }
     }