changeset 6339:532efd9be743

Implement groupBy for both ordered and unordered sources Contributed-By: paul.sandoz@oracle.com
author briangoetz
date Mon, 22 Oct 2012 17:04:59 -0400
parents 3ddb33ecd2ab
children f0ca8e57004a
files src/share/classes/java/util/streams/ops/FlagDeclaringOp.java src/share/classes/java/util/streams/ops/GroupByOp.java test-ng/tests/org/openjdk/tests/java/util/streams/ops/GroupByOpTest.java test-ng/tests/org/openjdk/tests/java/util/streams/ops/StreamOpTestCase.java
diffstat 4 files changed, 168 insertions(+), 81 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/share/classes/java/util/streams/ops/FlagDeclaringOp.java	Mon Oct 22 17:04:59 2012 -0400
@@ -0,0 +1,56 @@
+/*
+ * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package java.util.streams.ops;
+
+import java.util.Iterator;
+import java.util.streams.Sink;
+
+/**
+ * An operation that injects or clears flags but otherwise performs no operation on elements.
+ *
+ */
+public class FlagDeclaringOp<T> implements IntermediateOp<T, T> {
+    private final int flags;
+
+    public FlagDeclaringOp(int flags) {
+        this.flags = flags;
+    }
+
+    @Override
+    public int getOpFlags() {
+        return flags;
+    }
+
+    @Override
+    public Iterator<T> wrapIterator(final Iterator<T> source) {
+        return source;
+    }
+
+    @Override
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    public Sink<T> wrapSink(Sink sink) {
+        return sink;
+    }
+}
--- a/src/share/classes/java/util/streams/ops/GroupByOp.java	Mon Oct 22 13:07:08 2012 -0400
+++ b/src/share/classes/java/util/streams/ops/GroupByOp.java	Mon Oct 22 17:04:59 2012 -0400
@@ -28,15 +28,11 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.functions.Factory;
 import java.util.functions.Mapper;
-import java.util.logging.Level;
-import java.util.logging.Logger;
 import java.util.streams.*;
 
 /**
  * GroupByOp
  * <p>If an element maps to a <code>null</code> key a {@link NullPointerException} will be thrown.</p>
- * <p>Encounter order is not preserved for the collection of grouped elements,
- * the order of the grouped elements is undefined.</p>
  *
  * @param <T> Type of elements to be grouped.
  * @param <K> Type of elements in the resulting Map.
@@ -61,63 +57,81 @@
 
     // Public for tests
     public TerminalSink<T, Map<K, Collection<T>>> sink() {
-        return new TerminalSink<T, Map<K, Collection<T>>>() {
-            private Map<K, Collection<T>> map;
-
-            @Override
-            public void begin(int size) {
-                map = new HashMap<>();
-            }
-
-            @Override
-            public Map<K, Collection<T>> getAndClearState() {
-                Map<K, Collection<T>> result = map;
-                map = null;
-                return result;
-            }
-
-            @Override
-            public void accept(T t) {
-                K key = Objects.requireNonNull(mapper.map(t), String.format("The element %s cannot be mapped to a null key", t));
-                Collection<T> c = map.get(key);
-                if (c == null) {
-                    c = valueFactory.make();
-                    map.put(key, c);
-                }
-                c.add(t);
-            }
-        };
+        return new GroupBySink();
     }
 
     @Override
     public <S> Map<K, Collection<T>> evaluateSequential(PipelineHelper<S, T> helper) {
-        return helper.into(sink()).getAndClearState();
+        return helper.into(new GroupBySink()).getAndClearState();
     }
 
     @Override
     public <S> Map<K, Collection<T>> evaluateParallel(ParallelPipelineHelper<S, T> helper) {
-        // @@@ If encounter order is required then a different algorithm will need to be used that preserves
-        //     the order of the grouped elements, otherwise the order does not need to be preserved
         if (StreamOpFlags.ORDERED.isKnown(helper.getFlags())) {
-            Logger.getLogger(getClass().getName()).log(Level.WARNING, "GroupByOp.evaluateParallel does not preserve encounter order");
+            return OpUtils.parallelReduce(helper, () -> new GroupBySink());
+        }
+        else {
+            final ConcurrentHashMap<K, Collection<T>> map = new ConcurrentHashMap<>();
+
+            // Cache the sink chain, so it can be reused by all F/J leaf tasks
+            Sink<S> sinkChain = helper.wrapSink(new Sink.OfValue<T>() {
+                @Override
+                public void accept(T t) {
+                    K key = Objects.requireNonNull(mapper.map(t), String.format("The element %s cannot be mapped to a null key", t));
+                    final Collection<T> sb = map.computeIfAbsent(key, (k) -> valueFactory.make());
+                    synchronized (sb) {
+                        sb.add(t);
+                    }
+                }
+            });
+
+            OpUtils.parallelForEach(helper, sinkChain);
+
+            return map;
+        }
+    }
+
+    private class GroupBySink implements OpUtils.AccumulatingSink<T, Map<K, Collection<T>>, GroupBySink> {
+        Map<K, Collection<T>> map;
+
+        @Override
+        public void begin(int size) {
+            map = new HashMap<>();
         }
 
-        final ConcurrentHashMap<K, Collection<T>> map = new ConcurrentHashMap<>();
+        @Override
+        public void clearState() {
+            map = null;
+        }
 
-        // Cache the sink chain, so it can be reused by all F/J leaf tasks
-        Sink<S> sinkChain = helper.wrapSink(new Sink.OfValue<T>() {
-            @Override
-            public void accept(T t) {
-                K key = Objects.requireNonNull(mapper.map(t), String.format("The element %s cannot be mapped to a null key", t));
-                final Collection<T> sb = map.computeIfAbsent(key, (k) -> valueFactory.make());
-                synchronized (sb) {
-                    sb.add(t);
+        @Override
+        public Map<K, Collection<T>> getAndClearState() {
+            Map<K, Collection<T>> result = map;
+            map = null;
+            return result;
+        }
+
+        @Override
+        public void accept(T t) {
+            K key = Objects.requireNonNull(mapper.map(t), String.format("The element %s cannot be mapped to a null key", t));
+            Collection<T> c = map.get(key);
+            if (c == null) {
+                c = valueFactory.make();
+                map.put(key, c);
+            }
+            c.add(t);
+        }
+
+        @Override
+        public void combine(GroupBySink other) {
+            for (Map.Entry<K, Collection<T>> e : other.map.entrySet()) {
+                if (map.containsKey(e.getKey())) {
+                    map.get(e.getKey()).addAll(e.getValue());
+                }
+                else {
+                    map.put(e.getKey(), e.getValue());
                 }
             }
-        });
-
-        OpUtils.parallelForEach(helper, sinkChain);
-
-        return map;
+        }
     }
 }
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/GroupByOpTest.java	Mon Oct 22 13:07:08 2012 -0400
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/GroupByOpTest.java	Mon Oct 22 17:04:59 2012 -0400
@@ -28,9 +28,11 @@
 import org.testng.annotations.Test;
 
 import java.util.*;
-import java.util.functions.Mappers;
+import java.util.functions.*;
 import java.util.streams.Stream;
+import java.util.streams.StreamOpFlags;
 import java.util.streams.ops.GroupByOp;
+import java.util.streams.ops.FlagDeclaringOp;
 
 import static org.openjdk.tests.java.util.LambdaTestHelpers.*;
 
@@ -75,45 +77,64 @@
         }
     }
 
+    static class MapperData<T, K> {
+        Mapper<T, K> m;
+        int expectedSize;
+
+        MapperData(Mapper<T, K> m, int expectedSize) {
+            this.m = m;
+            this.expectedSize = expectedSize;
+        }
+    }
+
+    List<MapperData<Integer, ?>> getMapperData(TestData<Integer> data) {
+        int uniqueSize = data.into(new HashSet<>()).size();
+
+        return Arrays.<MapperData<Integer, ?>>asList(
+            new MapperData<>(mId, uniqueSize),
+            new MapperData<>(mZero, Math.min(1, data.size())),
+            new MapperData<>(mDoubler, uniqueSize),
+            new MapperData<>(mId.compose(mDoubler), uniqueSize),
+            new MapperData<>(mDoubler.compose(mDoubler), uniqueSize),
+
+            new MapperData<>(Mappers.forPredicate(pFalse, true, false), Math.min(1, uniqueSize)),
+            new MapperData<>(Mappers.forPredicate(pTrue, true, false), Math.min(1, uniqueSize)),
+            new MapperData<>(Mappers.forPredicate(pEven, true, false), Math.min(2, uniqueSize)),
+            new MapperData<>(Mappers.forPredicate(pOdd, true, false), Math.min(2, uniqueSize))
+        );
+    }
+
     @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
     public void testOps(String name, TestData<Integer> data) {
         // @@@ More things to test here:
         //     - Every value in data is present in right bucket
         //     - Total number of values equals size of data
 
-        int uniqueSize = data.into(new HashSet<Integer>()).size();
+        for (MapperData<Integer, ?> md : getMapperData(data)) {
+            Map<?, Collection<Integer>> result = exerciseOps(data,
+                                                             new GroupByOp<Integer, Object>(md.m));
+            assertEquals(result.keySet().size(), md.expectedSize);
+        }
+    }
 
-        Map<Integer, Collection<Integer>> miResult = exerciseGroupByOps(data, new GroupByOp<>(mId));
-        assertEquals(miResult.keySet().size(), uniqueSize);
+    @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
+    public void testWithUnorderedOp(String name, TestData<Integer> data) {
+        // @@@ More things to test here:
+        //     - Every value in data is present in right bucket
+        //     - Total number of values equals size of data
 
-        miResult = exerciseGroupByOps(data, new GroupByOp<>(mZero));
-        assertEquals(miResult.keySet().size(), Math.min(1, data.size()));
-
-        miResult = exerciseGroupByOps(data, new GroupByOp<>(mDoubler));
-        assertEquals(miResult.keySet().size(), uniqueSize);
-
-        miResult = exerciseGroupByOps(data, new GroupByOp<>(mId.compose(mDoubler)));
-        assertEquals(miResult.keySet().size(), uniqueSize);
-
-        miResult = exerciseGroupByOps(data, new GroupByOp<>(mDoubler.compose(mDoubler)));
-        assertEquals(miResult.keySet().size(), uniqueSize);
-
-        Map<Boolean, Collection<Integer>> mbResult = exerciseGroupByOps(data, new GroupByOp<>(Mappers.forPredicate(pFalse, true, false)));
-        assertEquals(mbResult.keySet().size(), Math.min(1, uniqueSize));
-
-        mbResult = exerciseGroupByOps(data, new GroupByOp<>(Mappers.forPredicate(pTrue, true, false)));
-        assertEquals(mbResult.keySet().size(), Math.min(1, uniqueSize));
-
-        mbResult = exerciseGroupByOps(data, new GroupByOp<>(Mappers.forPredicate(pEven, true, false)));
-        assertEquals(mbResult.keySet().size(), Math.min(2, uniqueSize));
-
-        mbResult = exerciseGroupByOps(data, new GroupByOp<>(Mappers.forPredicate(pOdd, true, false)));
-        assertEquals(mbResult.keySet().size(), Math.min(2, uniqueSize));
+        for (MapperData<Integer, ?> md : getMapperData(data)) {
+            Map<?, Collection<Integer>> result = exerciseOps(data,
+                                                             this::multiMapEquals,
+                                                             new GroupByOp<Integer, Object>(md.m),
+                                                             new FlagDeclaringOp<>(StreamOpFlags.NOT_ORDERED));
+            assertEquals(result.keySet().size(), md.expectedSize);
+        }
     }
 
     @Test(dataProvider = "opArrays", dataProviderClass = StreamTestDataProvider.class)
     public void testWithValueFactory(String name, TestData<Integer> data) {
-        Map<Integer, Collection<Integer>> miResult = exerciseGroupByOps(data, new GroupByOp<Integer, Integer>(mId, LinkedList::new));
+        Map<Integer, Collection<Integer>> miResult = exerciseOps(data, new GroupByOp<Integer, Integer>(mId, LinkedList::new));
 
         Set<Class<?>> classes = miResult.values().stream().map(e -> e.getClass()).into(new HashSet<Class<?>>());
 
@@ -123,10 +144,6 @@
         }
     }
 
-    <T, K> Map<K, Collection<T>> exerciseGroupByOps(TestData<T> data, GroupByOp<T, K> gbop) {
-        return exerciseOps(data, this::multiMapEquals, gbop);
-    }
-
     <K, V> boolean multiMapEquals(Map<K, Collection<V>> a, Map<K, Collection<V>> b) {
         if (!Objects.equals(a.keySet(), b.keySet())) {
             return false;
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/StreamOpTestCase.java	Mon Oct 22 13:07:08 2012 -0400
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/StreamOpTestCase.java	Mon Oct 22 17:04:59 2012 -0400
@@ -660,7 +660,7 @@
         @Override
         @SuppressWarnings({ "rawtypes", "unchecked" })
         public AbstractPipeline<?, T> seq() {
-            return (AbstractPipeline<?, T>) Streams.stream(collection, -1);
+            return (AbstractPipeline<?, T>) Streams.stream(collection, -1, StreamOpFlags.IS_ORDERED);
         }
     }
 }