changeset 8890:ad41f42ffd4e

Let the task help out trying to complete rather than fork if aggressive forking would unduly increase memory usage and potentially result in out of memeory errors under reasonable maximum heap sizes.
author psandoz
date Fri, 21 Jun 2013 11:03:27 +0200
parents 8c51cc1a84e6
children bc8ab76ab484 2029f6c93b17
files src/share/classes/java/util/stream/AbstractTask.java
diffstat 1 files changed, 33 insertions(+), 1 deletions(-) [+]
line wrap: on
line diff
--- a/src/share/classes/java/util/stream/AbstractTask.java	Fri Jun 21 11:03:26 2013 +0200
+++ b/src/share/classes/java/util/stream/AbstractTask.java	Fri Jun 21 11:03:27 2013 +0200
@@ -306,9 +306,31 @@
     public final void compute() {
         @SuppressWarnings("unchecked")
         K task = (K) this;
+        long parentEstimate = task.spliterator.estimateSize();
+        long estimate = 0;
         while (task.canCompute()) {
             Spliterator<P_IN> split;
-            if (!task.suggestSplit() || (split = task.spliterator.trySplit()) == null) {
+            // The task will help out executing other tasks in the computation
+            // if the following conditions are met:
+            // - the current (right) split estimate is greater than a threshold.
+            // - the splitting is right-balanced. The left split estimate is
+            //   less than a quarter of the parent split estimate.
+            // - the queued task count in the current thread is greater than
+            //   the parallelism of the pool.
+            // For right-balanced trees created by extracting parallelism from a
+            // sequential source (such as a spliterator created from an
+            // iterator) this task can be too aggressive forking new tasks
+            // that reference arrays of values of increasing size. This can
+            // result very high memory consumption and out of memory errors
+            // unless instead of forking this task helps out if the above
+            // conditions are met.
+            if (estimate > (1 << 24) &&
+                (parentEstimate - estimate) < (parentEstimate >>> 2) &&
+                getQueuedTaskCount() > getPoolParallelism()) {
+                // Help complete attempting to complete just one task then
+                // re-check the conditions to help out some more
+                task.helpComplete(1);
+            } else if (!task.suggestSplit() || (split = task.spliterator.trySplit()) == null) {
                 task.setLocalResult(task.doLeaf());
                 task.tryComplete();
                 return;
@@ -320,10 +342,20 @@
                 task.setPendingCount(1);
                 l.fork();
                 task = r;
+                parentEstimate = estimate;
+                estimate = task.spliterator.estimateSize();
             }
         }
     }
 
+    private int getPoolParallelism() {
+        ForkJoinPool p = getPool();
+        if (p == null) {
+            p = ForkJoinPool.commonPool();
+        }
+        return p.getParallelism();
+    }
+
     /**
      * {@inheritDoc}
      *