changeset 6924:46124e93c85a

More renames
author briangoetz
date Wed, 09 Jan 2013 17:07:17 -0500
parents 4972b0802678
children 80320902a94f
files src/share/classes/java/util/stream/ConcurrentCollectors.java src/share/classes/java/util/stream/ConcurrentReducers.java
diffstat 2 files changed, 275 insertions(+), 275 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/share/classes/java/util/stream/ConcurrentCollectors.java	Wed Jan 09 17:07:17 2013 -0500
@@ -0,0 +1,275 @@
+/*
+ * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package java.util.stream;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.BinaryOperator;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+
+/**
+ * ConcurrentReducers
+ *
+ * @author Brian Goetz
+ */
+public class ConcurrentCollectors {
+    private static abstract class ConcurrentMapCollector<T, K, D, M extends ConcurrentMap<K, D>> implements Collector<T, M> {
+        private final Supplier<M> mapFactory;
+        private final BinaryOperator<D> mergeFunction;
+
+        protected ConcurrentMapCollector(Supplier<M> mapFactory, BinaryOperator<D> function) {
+            this.mapFactory = mapFactory;
+            mergeFunction = function;
+        }
+
+        @Override
+        public boolean isConcurrent() {
+            return true;
+        }
+
+        @Override
+        public M makeResult() {
+            return mapFactory.get();
+        }
+
+        @Override
+        public M combine(M map, M other) {
+            for (Map.Entry<K, D> e : other.entrySet()) {
+                K key = e.getKey();
+                map.put(key, !map.containsKey(key) ? e.getValue() : mergeFunction.apply(map.get(key), e.getValue()));
+            }
+            return map;
+        }
+    }
+
+    public static <T, K>
+    Collector<T, ConcurrentMap<K, Collection<T>>> groupBy(Function<? super T, ? extends K> classifier) {
+        return ConcurrentCollectors.<T, K, Collection<T>, ConcurrentMap<K, Collection<T>>>
+                groupBy(classifier, ConcurrentHashMap::new, ArrayList::new);
+    }
+
+    public static <T, K, C extends Collection<T>, M extends ConcurrentMap<K, C>>
+    Collector<T, M> groupBy(Function<? super T, ? extends K> classifier,
+                          Supplier<M> mapFactory,
+                          Supplier<C> rowFactory) {
+        return groupBy(classifier, mapFactory, Collectors.toCollection(rowFactory));
+    }
+
+    public static <T, K, D> Collector<T, ConcurrentMap<K, D>> groupBy(Function<? super T, ? extends K> classifier,
+                                                                    Collector<T, D> downstream) {
+        return groupBy(classifier, (Supplier<ConcurrentMap<K,D>>) ConcurrentHashMap::new, downstream);
+    }
+
+    static <T, K, D, M extends ConcurrentMap<K, D>> Collector<T, M> groupBy(Function<? super T, ? extends K> classifier,
+                                                                          Supplier<M> mapFactory,
+                                                                          Collector<T, D> downstream) {
+        return new ConcurrentMapCollector<T, K, D, M>(mapFactory, downstream::combine) {
+            @Override
+            public void accumulate(M map, T t) {
+                D container = map.computeIfAbsent(classifier.apply(t), k -> downstream.makeResult());
+                if (downstream.isConcurrent()) {
+                    downstream.accumulate(container, t);
+                }
+                else {
+                    synchronized (container) {
+                        downstream.accumulate(container, t);
+                    }
+                }
+            }
+        };
+    }
+
+    public static <T, K> Collector<T, ConcurrentMap<K,T>> groupBy(Function<? super T, ? extends K> classifier,
+                                                                BinaryOperator<T> reducer) {
+        return groupBy(classifier, ConcurrentHashMap<K,T>::new, (Function<T,T>) e -> e, reducer);
+    }
+
+    public static <T, K, M extends ConcurrentMap<K, T>>
+    Collector<T, M> groupBy(Function<? super T, ? extends K> classifier,
+                          Supplier<M> mapFactory,
+                          BinaryOperator<T> reducer) {
+        return groupBy(classifier, mapFactory, (Function<T,T>) e -> e, reducer);
+    }
+
+    public static <T, K, D> Collector<T, ConcurrentMap<K,D>> groupBy(Function<? super T, ? extends K> classifier,
+                                                         Function<? super T, ? extends D> mapper,
+                                                         BinaryOperator<D> reducer) {
+        return groupBy(classifier, (Supplier<ConcurrentMap<K,D>>) ConcurrentHashMap::new, mapper, reducer);
+    }
+
+    public static <T, K, D, M extends ConcurrentMap<K, D>>
+    Collector<T, M> groupBy(Function<? super T, ? extends K> classifier,
+                          Supplier<M> mapFactory,
+                          Function<? super T, ? extends D> mapper,
+                          BinaryOperator<D> reducer) {
+        return new ConcurrentMapCollector<T, K, D, M>(mapFactory, reducer) {
+            @Override
+            public void accumulate(M map, T t) {
+                map.merge(classifier.apply(t), mapper.apply(t), reducer);
+            }
+        };
+    }
+
+
+    public static <T, U> Collector<T, ConcurrentMap<T,U>> mappedTo(Function<? super T, ? extends U> mapper) {
+        return mappedTo(mapper, (BinaryOperator<U>) Collectors.THROWING_MERGER);
+    }
+
+    public static <T, U> Collector<T, ConcurrentMap<T,U>> mappedTo(Function<? super T, ? extends U> mapper,
+                                                                 BinaryOperator<U> mergeFunction) {
+        return mappedTo(mapper, mergeFunction, (Supplier<ConcurrentMap<T,U>>) ConcurrentHashMap::new);
+    }
+
+    public static <T, U, M extends ConcurrentMap<T, U>> Collector<T, M> mappedTo(Function<? super T, ? extends U> mapper,
+                                                                               Supplier<M> mapSupplier) {
+        return mappedTo(mapper, (BinaryOperator<U>) Collectors.THROWING_MERGER, mapSupplier);
+    }
+    public static <T, U, M extends ConcurrentMap<T, U>> Collector<T, M> mappedTo(Function<? super T, ? extends U> mapper,
+                                                                               BinaryOperator<U> mergeFunction,
+                                                                               Supplier<M> mapSupplier) {
+        return new ConcurrentMapCollector<T, T, U, M>(mapSupplier, mergeFunction) {
+            @Override
+            public void accumulate(M map, T value) {
+                map.merge(value, mapper.apply(value), mergeFunction);
+            }
+        };
+    }
+
+    public static class Partition<T> {
+        private T forTrue, forFalse;
+
+        public Partition(Supplier<T> supplier) {
+            this.forFalse = supplier.get();
+            this.forTrue = supplier.get();
+        }
+
+        public Partition(T forTrue, T forFalse) {
+            this.forTrue = forTrue;
+            this.forFalse = forFalse;
+        }
+
+        public T forTrue() {
+            return forTrue;
+        }
+
+        public T forFalse() {
+            return forFalse;
+        }
+    }
+
+    public static<T> Collector<T, Partition<Collection<T>>> partition(Predicate<T> predicate) {
+        return partition(predicate, (Supplier<Collection<T>>) ArrayList::new);
+    }
+
+    public static<T, C extends Collection<T>> Collector<T, Partition<C>> partition(Predicate<T> predicate,
+                                                                                 Supplier<C> rowFactory) {
+        return partition(predicate, Collectors.toCollection(rowFactory));
+    }
+
+    public static<T, D> Collector<T, Partition<D>> partition(Predicate<T> predicate, Collector<T,D> downstream) {
+        return new Collector<T, Partition<D>>() {
+            @Override
+            public Partition<D> makeResult() {
+                return new Partition<>(downstream::makeResult);
+            }
+
+            @Override
+            public boolean isConcurrent() {
+                return true;
+            }
+
+            @Override
+            public void accumulate(Partition<D> result, T value) {
+                D container = predicate.test(value) ? result.forTrue() : result.forFalse();
+                if (downstream.isConcurrent()) {
+                    downstream.accumulate(container, value);
+                }
+                else {
+                    synchronized(container) {
+                        downstream.accumulate(container, value);
+                    }
+                }
+            }
+
+            @Override
+            public Partition<D> combine(Partition<D> result, Partition<D> other) {
+                return new Partition<>(downstream.combine(result.forTrue(), other.forTrue()),
+                                       downstream.combine(result.forFalse(), other.forFalse()));
+            }
+        };
+    }
+
+    public static<T> Collector<T, Partition<T>> partition(Predicate<T> predicate,
+                                                        T identity,
+                                                        BinaryOperator<T> reducer) {
+        return partition(predicate, identity, (Function<T,T>) e -> e, reducer);
+    }
+
+    public static<T, U> Collector<T, Partition<U>> partition(Predicate<T> predicate,
+                                                           U identity,
+                                                           Function<T, U> mapper,
+                                                           BinaryOperator<U> reducer) {
+        return new Collector<T, Partition<U>>() {
+            private final Object trueLock = new Object();
+            private final Object falseLock = new Object();
+
+            @Override
+            public Partition<U> makeResult() {
+                return new Partition<>(identity, identity);
+            }
+
+            @Override
+            public boolean isConcurrent() {
+                return true;
+            }
+
+            @Override
+            public void accumulate(Partition<U> result, T value) {
+                U mapped = mapper.apply(value);
+                if (predicate.test(value)) {
+                    synchronized (trueLock) {
+                        result.forTrue = reducer.apply(result.forTrue(), mapped);
+                    }
+                }
+                else {
+                    synchronized (falseLock) {
+                        result.forFalse = reducer.apply(result.forFalse(), mapped);
+                    }
+                }
+            }
+
+            @Override
+            public Partition<U> combine(Partition<U> result, Partition<U> other) {
+                result.forTrue = reducer.apply(result.forTrue(), other.forTrue());
+                result.forFalse = reducer.apply(result.forFalse(), other.forFalse());
+                return result;
+            }
+        };
+    }
+
+}
--- a/src/share/classes/java/util/stream/ConcurrentReducers.java	Wed Jan 09 16:32:17 2013 -0500
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,275 +0,0 @@
-/*
- * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.  Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-package java.util.stream;
-
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.function.BinaryOperator;
-import java.util.function.Function;
-import java.util.function.Predicate;
-import java.util.function.Supplier;
-
-/**
- * ConcurrentReducers
- *
- * @author Brian Goetz
- */
-public class ConcurrentReducers {
-    private static abstract class ConcurrentMapCollector<T, K, D, M extends ConcurrentMap<K, D>> implements Collector<T, M> {
-        private final Supplier<M> mapFactory;
-        private final BinaryOperator<D> mergeFunction;
-
-        protected ConcurrentMapCollector(Supplier<M> mapFactory, BinaryOperator<D> function) {
-            this.mapFactory = mapFactory;
-            mergeFunction = function;
-        }
-
-        @Override
-        public boolean isConcurrent() {
-            return true;
-        }
-
-        @Override
-        public M makeResult() {
-            return mapFactory.get();
-        }
-
-        @Override
-        public M combine(M map, M other) {
-            for (Map.Entry<K, D> e : other.entrySet()) {
-                K key = e.getKey();
-                map.put(key, !map.containsKey(key) ? e.getValue() : mergeFunction.apply(map.get(key), e.getValue()));
-            }
-            return map;
-        }
-    }
-
-    public static <T, K>
-    Collector<T, ConcurrentMap<K, Collection<T>>> groupBy(Function<? super T, ? extends K> classifier) {
-        return ConcurrentReducers.<T, K, Collection<T>, ConcurrentMap<K, Collection<T>>>
-                groupBy(classifier, ConcurrentHashMap::new, ArrayList::new);
-    }
-
-    public static <T, K, C extends Collection<T>, M extends ConcurrentMap<K, C>>
-    Collector<T, M> groupBy(Function<? super T, ? extends K> classifier,
-                          Supplier<M> mapFactory,
-                          Supplier<C> rowFactory) {
-        return groupBy(classifier, mapFactory, Collectors.toCollection(rowFactory));
-    }
-
-    public static <T, K, D> Collector<T, ConcurrentMap<K, D>> groupBy(Function<? super T, ? extends K> classifier,
-                                                                    Collector<T, D> downstream) {
-        return groupBy(classifier, (Supplier<ConcurrentMap<K,D>>) ConcurrentHashMap::new, downstream);
-    }
-
-    static <T, K, D, M extends ConcurrentMap<K, D>> Collector<T, M> groupBy(Function<? super T, ? extends K> classifier,
-                                                                          Supplier<M> mapFactory,
-                                                                          Collector<T, D> downstream) {
-        return new ConcurrentMapCollector<T, K, D, M>(mapFactory, downstream::combine) {
-            @Override
-            public void accumulate(M map, T t) {
-                D container = map.computeIfAbsent(classifier.apply(t), k -> downstream.makeResult());
-                if (downstream.isConcurrent()) {
-                    downstream.accumulate(container, t);
-                }
-                else {
-                    synchronized (container) {
-                        downstream.accumulate(container, t);
-                    }
-                }
-            }
-        };
-    }
-
-    public static <T, K> Collector<T, ConcurrentMap<K,T>> groupBy(Function<? super T, ? extends K> classifier,
-                                                                BinaryOperator<T> reducer) {
-        return groupBy(classifier, ConcurrentHashMap<K,T>::new, (Function<T,T>) e -> e, reducer);
-    }
-
-    public static <T, K, M extends ConcurrentMap<K, T>>
-    Collector<T, M> groupBy(Function<? super T, ? extends K> classifier,
-                          Supplier<M> mapFactory,
-                          BinaryOperator<T> reducer) {
-        return groupBy(classifier, mapFactory, (Function<T,T>) e -> e, reducer);
-    }
-
-    public static <T, K, D> Collector<T, ConcurrentMap<K,D>> groupBy(Function<? super T, ? extends K> classifier,
-                                                         Function<? super T, ? extends D> mapper,
-                                                         BinaryOperator<D> reducer) {
-        return groupBy(classifier, (Supplier<ConcurrentMap<K,D>>) ConcurrentHashMap::new, mapper, reducer);
-    }
-
-    public static <T, K, D, M extends ConcurrentMap<K, D>>
-    Collector<T, M> groupBy(Function<? super T, ? extends K> classifier,
-                          Supplier<M> mapFactory,
-                          Function<? super T, ? extends D> mapper,
-                          BinaryOperator<D> reducer) {
-        return new ConcurrentMapCollector<T, K, D, M>(mapFactory, reducer) {
-            @Override
-            public void accumulate(M map, T t) {
-                map.merge(classifier.apply(t), mapper.apply(t), reducer);
-            }
-        };
-    }
-
-
-    public static <T, U> Collector<T, ConcurrentMap<T,U>> mappedTo(Function<? super T, ? extends U> mapper) {
-        return mappedTo(mapper, (BinaryOperator<U>) Collectors.THROWING_MERGER);
-    }
-
-    public static <T, U> Collector<T, ConcurrentMap<T,U>> mappedTo(Function<? super T, ? extends U> mapper,
-                                                                 BinaryOperator<U> mergeFunction) {
-        return mappedTo(mapper, mergeFunction, (Supplier<ConcurrentMap<T,U>>) ConcurrentHashMap::new);
-    }
-
-    public static <T, U, M extends ConcurrentMap<T, U>> Collector<T, M> mappedTo(Function<? super T, ? extends U> mapper,
-                                                                               Supplier<M> mapSupplier) {
-        return mappedTo(mapper, (BinaryOperator<U>) Collectors.THROWING_MERGER, mapSupplier);
-    }
-    public static <T, U, M extends ConcurrentMap<T, U>> Collector<T, M> mappedTo(Function<? super T, ? extends U> mapper,
-                                                                               BinaryOperator<U> mergeFunction,
-                                                                               Supplier<M> mapSupplier) {
-        return new ConcurrentMapCollector<T, T, U, M>(mapSupplier, mergeFunction) {
-            @Override
-            public void accumulate(M map, T value) {
-                map.merge(value, mapper.apply(value), mergeFunction);
-            }
-        };
-    }
-
-    public static class Partition<T> {
-        private T forTrue, forFalse;
-
-        public Partition(Supplier<T> supplier) {
-            this.forFalse = supplier.get();
-            this.forTrue = supplier.get();
-        }
-
-        public Partition(T forTrue, T forFalse) {
-            this.forTrue = forTrue;
-            this.forFalse = forFalse;
-        }
-
-        public T forTrue() {
-            return forTrue;
-        }
-
-        public T forFalse() {
-            return forFalse;
-        }
-    }
-
-    public static<T> Collector<T, Partition<Collection<T>>> partition(Predicate<T> predicate) {
-        return partition(predicate, (Supplier<Collection<T>>) ArrayList::new);
-    }
-
-    public static<T, C extends Collection<T>> Collector<T, Partition<C>> partition(Predicate<T> predicate,
-                                                                                 Supplier<C> rowFactory) {
-        return partition(predicate, Collectors.toCollection(rowFactory));
-    }
-
-    public static<T, D> Collector<T, Partition<D>> partition(Predicate<T> predicate, Collector<T,D> downstream) {
-        return new Collector<T, Partition<D>>() {
-            @Override
-            public Partition<D> makeResult() {
-                return new Partition<>(downstream::makeResult);
-            }
-
-            @Override
-            public boolean isConcurrent() {
-                return true;
-            }
-
-            @Override
-            public void accumulate(Partition<D> result, T value) {
-                D container = predicate.test(value) ? result.forTrue() : result.forFalse();
-                if (downstream.isConcurrent()) {
-                    downstream.accumulate(container, value);
-                }
-                else {
-                    synchronized(container) {
-                        downstream.accumulate(container, value);
-                    }
-                }
-            }
-
-            @Override
-            public Partition<D> combine(Partition<D> result, Partition<D> other) {
-                return new Partition<>(downstream.combine(result.forTrue(), other.forTrue()),
-                                       downstream.combine(result.forFalse(), other.forFalse()));
-            }
-        };
-    }
-
-    public static<T> Collector<T, Partition<T>> partition(Predicate<T> predicate,
-                                                        T identity,
-                                                        BinaryOperator<T> reducer) {
-        return partition(predicate, identity, (Function<T,T>) e -> e, reducer);
-    }
-
-    public static<T, U> Collector<T, Partition<U>> partition(Predicate<T> predicate,
-                                                           U identity,
-                                                           Function<T, U> mapper,
-                                                           BinaryOperator<U> reducer) {
-        return new Collector<T, Partition<U>>() {
-            private final Object trueLock = new Object();
-            private final Object falseLock = new Object();
-
-            @Override
-            public Partition<U> makeResult() {
-                return new Partition<>(identity, identity);
-            }
-
-            @Override
-            public boolean isConcurrent() {
-                return true;
-            }
-
-            @Override
-            public void accumulate(Partition<U> result, T value) {
-                U mapped = mapper.apply(value);
-                if (predicate.test(value)) {
-                    synchronized (trueLock) {
-                        result.forTrue = reducer.apply(result.forTrue(), mapped);
-                    }
-                }
-                else {
-                    synchronized (falseLock) {
-                        result.forFalse = reducer.apply(result.forFalse(), mapped);
-                    }
-                }
-            }
-
-            @Override
-            public Partition<U> combine(Partition<U> result, Partition<U> other) {
-                result.forTrue = reducer.apply(result.forTrue(), other.forTrue());
-                result.forFalse = reducer.apply(result.forFalse(), other.forFalse());
-                return result;
-            }
-        };
-    }
-
-}