changeset 1348:91ff826899a3

7901789: Rework exception/timeout handling scheme: faster failure paths
author shade
date Thu, 15 Sep 2016 11:48:41 +0200
parents 5bc528ec6538
children c4aaa7a112b6
files jmh-core/src/main/java/org/openjdk/jmh/runner/BenchmarkHandler.java
diffstat 1 files changed, 48 insertions(+), 37 deletions(-) [+]
line wrap: on
line diff
--- a/jmh-core/src/main/java/org/openjdk/jmh/runner/BenchmarkHandler.java	Wed Sep 07 20:56:28 2016 +0300
+++ b/jmh-core/src/main/java/org/openjdk/jmh/runner/BenchmarkHandler.java	Thu Sep 15 11:48:41 2016 +0200
@@ -29,10 +29,7 @@
 import org.openjdk.jmh.infra.ThreadParams;
 import org.openjdk.jmh.profile.InternalProfiler;
 import org.openjdk.jmh.profile.ProfilerFactory;
-import org.openjdk.jmh.results.BenchmarkTaskResult;
-import org.openjdk.jmh.results.IterationResult;
-import org.openjdk.jmh.results.IterationResultMetaData;
-import org.openjdk.jmh.results.Result;
+import org.openjdk.jmh.results.*;
 import org.openjdk.jmh.runner.format.OutputFormat;
 import org.openjdk.jmh.runner.options.Options;
 import org.openjdk.jmh.runner.options.TimeValue;
@@ -331,9 +328,10 @@
         startProfilers(benchmarkParams, params);
 
         // submit tasks to threadpool
-        Map<BenchmarkTask, Future<BenchmarkTaskResult>> results = new HashMap<BenchmarkTask, Future<BenchmarkTaskResult>>();
+        List<Future<BenchmarkTaskResult>> completed = new ArrayList<Future<BenchmarkTaskResult>>();
+        CompletionService<BenchmarkTaskResult> srv = new ExecutorCompletionService<BenchmarkTaskResult>(executor);
         for (BenchmarkTask runner : runners) {
-            results.put(runner, executor.submit(runner));
+            srv.submit(runner);
         }
 
         // wait for all workers to transit to measurement
@@ -346,7 +344,14 @@
                 break;
             default:
                 try {
-                    runtime.sleep();
+                    Future<BenchmarkTaskResult> failing = srv.poll(runtime.convertTo(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
+                    if (failing != null) {
+                        // Oops, some task has exited prematurely, without isDone check.
+                        // Must be an exception. Record the failing result, and lift the
+                        // timeout deadline: we care only to exit as fast as possible now.
+                        completed.add(failing);
+                        waitDeadline = System.nanoTime();
+                    }
                 } catch (InterruptedException e) {
                     // regardless...
                 }
@@ -358,40 +363,46 @@
         // wait for all workers to transit to teardown
         control.awaitWarmdownReady();
 
-        // Wait for the result, continuously polling the worker threads.
-        // The abrupt exception in any worker will float up here.
+        // Wait for the result, handling timeouts
+        while (completed.size() < numThreads) {
+            try {
+                long waitFor = Math.max(TimeUnit.MILLISECONDS.toNanos(100), waitDeadline - System.nanoTime());
+                Future<BenchmarkTaskResult> fr = srv.poll(waitFor, TimeUnit.NANOSECONDS);
+                if (fr == null) {
+                    // We are in the timeout mode now, kick all the still running threads.
+                    out.print("(*interrupt*) ");
+                    for (BenchmarkTask task : runners) {
+                        Thread runner = task.runner;
+                        if (runner != null) {
+                            runner.interrupt();
+                        }
+                    }
+                } else {
+                    completed.add(fr);
+                }
+            } catch (InterruptedException ex) {
+                throw new BenchmarkException(ex);
+            }
+        }
+
+        // Process the results: we get here after all worker threads have quit,
+        // either normally or abnormally. This means, Future.get() would never block.
         long allOps = 0;
         long measuredOps = 0;
         IterationResult result;
-
         try {
-            int expected = numThreads;
-            while (expected > 0) {
-                for (Map.Entry<BenchmarkTask, Future<BenchmarkTaskResult>> re : results.entrySet()) {
-                    BenchmarkTask task = re.getKey();
-                    Future<BenchmarkTaskResult> fr = re.getValue();
-                    try {
-                        long waitFor = Math.max(TimeUnit.MILLISECONDS.toNanos(100), waitDeadline - System.nanoTime());
-
-                        BenchmarkTaskResult btr = fr.get(waitFor, TimeUnit.NANOSECONDS);
-                        iterationResults.addAll(btr.getResults());
-                        allOps += btr.getAllOps();
-                        measuredOps += btr.getMeasuredOps();
-                        expected--;
-                    } catch (InterruptedException ex) {
-                        throw new BenchmarkException(ex);
-                    } catch (ExecutionException ex) {
-                        // unwrap: ExecutionException -> Throwable-wrapper -> InvocationTargetException
-                        Throwable cause = ex.getCause().getCause().getCause();
-                        throw new BenchmarkException(cause);
-                    } catch (TimeoutException e) {
-                        // try to kick the thread, if it was already started
-                        Thread runner = task.runner;
-                        if (runner != null) {
-                            out.print("(*interrupt*) ");
-                            runner.interrupt();
-                        }
-                    }
+            for (Future<BenchmarkTaskResult> fr : completed) {
+                try {
+                    BenchmarkTaskResult btr = fr.get();
+                    iterationResults.addAll(btr.getResults());
+                    allOps += btr.getAllOps();
+                    measuredOps += btr.getMeasuredOps();
+                } catch (ExecutionException ex) {
+                    // unwrap: ExecutionException -> Throwable-wrapper -> InvocationTargetException
+                    Throwable cause = ex.getCause().getCause().getCause();
+                    throw new BenchmarkException(cause);
+                } catch (InterruptedException ex) {
+                    throw new BenchmarkException(ex);
                 }
             }
         } finally {