changeset 6331:af4b2dd992d6

Factor out parallel for-each logic into OpUtils, so it can be shared by ForEachOp, GroupByOp, UniqOp
author briangoetz
date Fri, 19 Oct 2012 17:27:49 -0400
parents 4cabb2e857d4
children 752fe6045da0 80332caeac10
files src/share/classes/java/util/Optional.java src/share/classes/java/util/streams/ParallelPipelineHelper.java src/share/classes/java/util/streams/ops/AbstractTask.java src/share/classes/java/util/streams/ops/ForEachOp.java src/share/classes/java/util/streams/ops/GroupByOp.java src/share/classes/java/util/streams/ops/OpUtils.java src/share/classes/java/util/streams/ops/UniqOp.java
diffstat 6 files changed, 141 insertions(+), 97 deletions(-) [+]
line wrap: on
line diff
--- a/src/share/classes/java/util/Optional.java	Fri Oct 19 10:20:13 2012 -0700
+++ b/src/share/classes/java/util/Optional.java	Fri Oct 19 17:27:49 2012 -0400
@@ -25,7 +25,6 @@
 package java.util;
 
 import java.util.functions.Factory;
-import java.util.functions.Mapper;
 
 /**
  * A return object which may or may not contain a value. If a value is present
@@ -74,6 +73,7 @@
      * @param <T> Type of the non-existent value.
      * @return an empty object.
      */
+    @SuppressWarnings("unchecked")
     public static<T> Optional<T> empty() {
         return (Optional<T>) EMPTY;
     }
@@ -159,10 +159,6 @@
         }
     }
 
-    public<V> Optional<V> map(Mapper<T, V> mapper) {
-        return present ? new Optional<>(mapper.map(value)) : Optional.<V>empty();
-    }
-
     @Override
     public boolean equals(Object o) {
         if (this == o) {
--- a/src/share/classes/java/util/streams/ops/AbstractTask.java	Fri Oct 19 10:20:13 2012 -0700
+++ b/src/share/classes/java/util/streams/ops/AbstractTask.java	Fri Oct 19 17:27:49 2012 -0400
@@ -38,8 +38,7 @@
  *
  * @author Brian Goetz
  */
-abstract class AbstractTask<P_IN, P_OUT, R, T
-        extends AbstractTask<P_IN, P_OUT, R, T>>
+abstract class AbstractTask<P_IN, P_OUT, R, T extends AbstractTask<P_IN, P_OUT, R, T>>
         extends CountedCompleter<R> {
     protected final ParallelPipelineHelper<P_IN, P_OUT> helper;
     protected final Spliterator<P_IN> spliterator;
@@ -128,8 +127,7 @@
     }
 }
 
-abstract class ComparableTask<P_IN, P_OUT, R, T
-        extends ComparableTask<P_IN, P_OUT, R, T>>
+abstract class ComparableTask<P_IN, P_OUT, R, T extends ComparableTask<P_IN, P_OUT, R, T>>
         extends AbstractTask<P_IN, P_OUT, R, T>
         implements Comparable<T> {
     protected final int depth;
--- a/src/share/classes/java/util/streams/ops/ForEachOp.java	Fri Oct 19 10:20:13 2012 -0700
+++ b/src/share/classes/java/util/streams/ops/ForEachOp.java	Fri Oct 19 17:27:49 2012 -0400
@@ -71,32 +71,7 @@
 
     @Override
     public <S> Void evaluateParallel(ParallelPipelineHelper<S, T> helper) {
-        return helper.invoke(new ForEachTask<>(helper, helper.wrapSink(sink)));
-    }
-
-    // @@@ Extending AbstractTask here is probably inefficient, since we don't really need to keep track of the structure of the computation tree
-    private static class ForEachTask<S, T> extends AbstractTask<S, T, Void, ForEachTask<S, T>> {
-        private final Sink<S> sink;
-
-        private ForEachTask(ParallelPipelineHelper<S, T> helper, Sink<S> sink) {
-            super(helper);
-            this.sink = sink;
-        }
-
-        private ForEachTask(ForEachTask<S, T> parent, Spliterator<S> spliterator, Sink<S> sink) {
-            super(parent, spliterator);
-            this.sink = sink;
-        }
-
-        @Override
-        protected ForEachTask<S, T> makeChild(Spliterator<S> spliterator) {
-            return new ForEachTask<>(this, spliterator, sink);
-        }
-
-        @Override
-        protected Void doLeaf() {
-            helper.into(spliterator, sink);
-            return null;
-        }
+        OpUtils.forEach(helper, helper.wrapSink(sink));
+        return null;
     }
 }
--- a/src/share/classes/java/util/streams/ops/GroupByOp.java	Fri Oct 19 10:20:13 2012 -0700
+++ b/src/share/classes/java/util/streams/ops/GroupByOp.java	Fri Oct 19 17:27:49 2012 -0400
@@ -102,45 +102,16 @@
             @Override
             public void accept(T t) {
                 K key = Objects.requireNonNull(mapper.map(t), String.format("The element %s cannot be mapped to a null key", t));
-                final NodeBuilder<T> sb = map.computeIfAbsent(key, (k) -> Nodes.makeBuilder());
+                final NodeBuilder<T> sb = map.computeIfAbsent(key, k -> Nodes.makeBuilder());
                 synchronized (sb) {
                     sb.accept(t);
                 }
             }
         });
 
-        GroupByTask<S, T> task = new GroupByTask<>(helper, sinkChain);
-        helper.invoke(task);
+        OpUtils.forEach(helper, sinkChain);
 
         // @@@ Fragile cast, need a better way to switch NodeBuilder into Collection
         return (Map) map;
     }
-
-    // @@@ This is a lot of boiler plate for "spliterator.into(sinkChain)"
-    //     consider abstracting to a static method on AbstractTask that takes a Block<Void>
-    private static class GroupByTask<S, T> extends AbstractTask<S, T, Void, GroupByTask<S, T>> {
-        private final Sink<S> sinkChain;
-
-        private GroupByTask(ParallelPipelineHelper<S, T> helper, Sink<S> sinkChain) {
-            super(helper);
-            this.sinkChain = sinkChain;
-        }
-
-        private GroupByTask(GroupByTask<S, T> parent, Spliterator<S> spliterator) {
-            super(parent, spliterator);
-            this.sinkChain = parent.sinkChain;
-        }
-
-        @Override
-        protected GroupByTask<S, T> makeChild(Spliterator<S> spliterator) {
-            return new GroupByTask<>(this, spliterator);
-        }
-
-        @Override
-        protected Void doLeaf() {
-            super.helper.into(spliterator, sinkChain);
-            return null;
-        }
-    }
-
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/share/classes/java/util/streams/ops/OpUtils.java	Fri Oct 19 17:27:49 2012 -0400
@@ -0,0 +1,133 @@
+/*
+ * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package java.util.streams.ops;
+
+import java.util.Optional;
+import java.util.functions.BinaryOperator;
+import java.util.functions.Combiner;
+import java.util.functions.Factory;
+import java.util.streams.ParallelPipelineHelper;
+import java.util.streams.Sink;
+import java.util.streams.Spliterator;
+import java.util.streams.TerminalSink;
+
+/**
+ * OpUtils
+ *
+ * @author Brian Goetz
+ */
+public class OpUtils {
+    private OpUtils() {
+        throw new IllegalStateException("no instances");
+    }
+
+    public static<T, R> TerminalSink<T, R> sink(final Factory<R> baseFactory, final Combiner<R, T, R> reducer) {
+        return new TerminalSink<T, R>() {
+            R state;
+
+            @Override
+            public void begin(int size) {
+                state = baseFactory.make();
+            }
+
+            @Override
+            public R getAndClearState() {
+                try {
+                    return state;
+                }
+                finally {
+                    state = null;
+                }
+            }
+
+            @Override
+            public void accept(T t) {
+                state = reducer.combine(state, t);
+            }
+        };
+    }
+
+    public static<T> TerminalSink<T, Optional<T>> sink(final BinaryOperator<T> reducer) {
+        return new TerminalSink<T, Optional<T>>() {
+            private boolean first = true;
+            private T state;
+
+            @Override
+            public void begin(int size) {
+                first = true;
+            }
+
+            @Override
+            public Optional<T> getAndClearState() {
+                try {
+                    return first ? Optional.<T>empty() : new Optional<>(state);
+                }
+                finally {
+                    state = null;
+                }
+            }
+
+            @Override
+            public void accept(T t) {
+                if (first) {
+                    first = false;
+                    state = t;
+                } else {
+                    state = reducer.operate(state, t);
+                }
+            }
+        };
+    }
+
+    public static<P_IN, P_OUT> void forEach(ParallelPipelineHelper<P_IN, P_OUT> helper, Sink<P_IN> sink) {
+        helper.invoke(new ForEachTask<>(helper, sink));
+    }
+
+    private static class ForEachTask<S, T> extends AbstractTask<S, T, Void, ForEachTask<S, T>> {
+        // @@@ Extending AbstractTask here is probably inefficient, since we don't really need to keep track of the structure of the computation tree
+        private final Sink<S> sink;
+
+        private ForEachTask(ParallelPipelineHelper<S, T> helper, Sink<S> sink) {
+            super(helper);
+            this.sink = sink;
+        }
+
+        private ForEachTask(ForEachTask<S, T> parent, Spliterator<S> spliterator, Sink<S> sink) {
+            super(parent, spliterator);
+            this.sink = sink;
+        }
+
+        @Override
+        protected ForEachTask<S, T> makeChild(Spliterator<S> spliterator) {
+            return new ForEachTask<>(this, spliterator, sink);
+        }
+
+        @Override
+        protected Void doLeaf() {
+            helper.into(spliterator, sink);
+            return null;
+        }
+    }
+}
--- a/src/share/classes/java/util/streams/ops/UniqOp.java	Fri Oct 19 10:20:13 2012 -0700
+++ b/src/share/classes/java/util/streams/ops/UniqOp.java	Fri Oct 19 17:27:49 2012 -0400
@@ -30,7 +30,6 @@
 import java.util.logging.Logger;
 import java.util.streams.ParallelPipelineHelper;
 import java.util.streams.Sink;
-import java.util.streams.Spliterator;
 import java.util.streams.StreamOpFlags;
 
 /**
@@ -138,37 +137,9 @@
             }
         });
 
-        helper.invoke(new UniqTask<>(helper, sinkChain));
+        OpUtils.forEach(helper, sinkChain);
 
         // @@@ Not very efficient
         return Nodes.node((T[])map.keySet().toArray());
     }
-
-    // @@@ This is a lot of boiler plate for "spliterator.into(sinkChain)"
-    //     consider abstracting to a static method on AbstractTask that takes a Block<Void>
-    private static class UniqTask<S, T> extends AbstractTask<S, T, Void, UniqTask<S, T>> {
-        private final Sink<S> sinkChain;
-
-        private UniqTask(ParallelPipelineHelper<S, T> helper, Sink<S> sinkChain) {
-            super(helper);
-            this.sinkChain = sinkChain;
-        }
-
-        private UniqTask(UniqTask<S, T> parent, Spliterator<S> spliterator) {
-            super(parent, spliterator);
-            this.sinkChain = parent.sinkChain;
-        }
-
-        @Override
-        protected UniqTask<S, T> makeChild(Spliterator<S> spliterator) {
-            return new UniqTask<>(this, spliterator);
-        }
-
-        @Override
-        protected Void doLeaf() {
-            helper.into(spliterator, sinkChain);
-            return null;
-        }
-    }
-
 }