changeset 329:9a849898ff24

Runners: Refactor out TestExecutor from the shared code.
author shade
date Thu, 20 Oct 2016 15:46:13 +0200
parents 01fb61958140
children 07d859cba98d
files jcstress-core/src/main/java/org/openjdk/jcstress/ForkedMain.java jcstress-core/src/main/java/org/openjdk/jcstress/JCStress.java jcstress-core/src/main/java/org/openjdk/jcstress/TestExecutor.java jcstress-core/src/main/java/org/openjdk/jcstress/infra/EndResult.java jcstress-core/src/main/java/org/openjdk/jcstress/infra/Scheduler.java jcstress-core/src/main/java/org/openjdk/jcstress/infra/runners/TestConfig.java
diffstat 6 files changed, 179 insertions(+), 233 deletions(-) [+]
line wrap: on
line diff
--- a/jcstress-core/src/main/java/org/openjdk/jcstress/ForkedMain.java	Thu Oct 20 15:22:13 2016 +0200
+++ b/jcstress-core/src/main/java/org/openjdk/jcstress/ForkedMain.java	Thu Oct 20 15:46:13 2016 +0200
@@ -56,7 +56,7 @@
         Runtime.getRuntime().addShutdownHook(new CloseBinaryLinkHook(link));
 
         TestConfig config = link.nextJob(token);
-        new JCStress(null).runEmbedded(config, link);
+        new TestExecutor(0, link, false).runEmbedded(config);
     }
 
     /**
--- a/jcstress-core/src/main/java/org/openjdk/jcstress/JCStress.java	Thu Oct 20 15:22:13 2016 +0200
+++ b/jcstress-core/src/main/java/org/openjdk/jcstress/JCStress.java	Thu Oct 20 15:46:13 2016 +0200
@@ -24,29 +24,19 @@
  */
 package org.openjdk.jcstress;
 
-import org.openjdk.jcstress.infra.Scheduler;
-import org.openjdk.jcstress.infra.Status;
 import org.openjdk.jcstress.infra.TestInfo;
 import org.openjdk.jcstress.infra.collectors.*;
 import org.openjdk.jcstress.infra.grading.ConsoleReportPrinter;
 import org.openjdk.jcstress.infra.grading.ExceptionReportPrinter;
 import org.openjdk.jcstress.infra.grading.TextReportPrinter;
 import org.openjdk.jcstress.infra.grading.HTMLReportPrinter;
-import org.openjdk.jcstress.infra.runners.Runner;
 import org.openjdk.jcstress.infra.runners.TestConfig;
 import org.openjdk.jcstress.infra.runners.TestList;
-import org.openjdk.jcstress.link.BinaryLinkServer;
-import org.openjdk.jcstress.util.InputStreamDrainer;
 import org.openjdk.jcstress.vm.VMSupport;
 
 import java.io.*;
 import java.lang.management.ManagementFactory;
-import java.lang.reflect.Constructor;
 import java.util.*;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Pattern;
 
 /**
@@ -55,53 +45,12 @@
  * @author Aleksey Shipilev (aleksey.shipilev@oracle.com)
  */
 public class JCStress {
-    final ExecutorService pool;
     final PrintStream out;
     final Options opts;
 
     public JCStress(Options opts) {
         this.opts = opts;
-        this.pool = Executors.newCachedThreadPool(new ThreadFactory() {
-            private final AtomicInteger id = new AtomicInteger();
-
-            @Override
-            public Thread newThread(Runnable r) {
-                Thread t = new Thread(r);
-                t.setName("worker" + id.incrementAndGet());
-                t.setDaemon(true);
-                return t;
-            }
-        });
-        out = System.out;
-    }
-
-    class TestCfgTask implements Scheduler.ScheduledTask {
-        private final TestConfig cfg;
-        private final BinaryLinkServer server;
-        private final TestResultCollector sink;
-
-        public TestCfgTask(TestConfig cfg, BinaryLinkServer server, TestResultCollector sink) {
-            this.cfg = cfg;
-            this.server = server;
-            this.sink = sink;
-        }
-
-        @Override
-        public int getTokens() {
-            return cfg.threads;
-        }
-
-        @Override
-        public void run() {
-            switch (cfg.runMode) {
-                case EMBEDDED:
-                    runEmbedded(cfg, sink);
-                    break;
-                case FORKED:
-                    runForked(cfg, server, sink);
-                    break;
-            }
-        }
+        this.out = System.out;
     }
 
     public void run() throws Exception {
@@ -117,16 +66,11 @@
         DiskWriteCollector diskCollector = new DiskWriteCollector(opts.getResultFile());
         TestResultCollector sink = MuxCollector.of(printer, diskCollector);
 
-        BinaryLinkServer server = new BinaryLinkServer(opts.getUserCPUs(), sink);
-
-        Scheduler scheduler = new Scheduler(opts.getUserCPUs());
+        TestExecutor executor = new TestExecutor(opts.getUserCPUs(), sink, true);
         for (TestConfig cfg : configs) {
-            server.addTask(cfg);
-            scheduler.schedule(new TestCfgTask(cfg, server, sink));
+            executor.submit(cfg);
         }
-        scheduler.waitFinish();
-
-        server.terminate();
+        executor.waitFinish();
 
         diskCollector.close();
 
@@ -185,68 +129,6 @@
         return configs;
     }
 
-    void runForked(TestConfig config, BinaryLinkServer server, TestResultCollector collector) {
-        try {
-            List<String> command = new ArrayList<>();
-
-            // basic Java line
-            command.addAll(VMSupport.getJavaInvokeLine());
-
-            // jvm args
-            command.addAll(config.jvmArgs);
-
-            command.add(ForkedMain.class.getName());
-
-            command.add(server.getHost());
-            command.add(String.valueOf(server.getPort()));
-
-            // which config should the forked VM pull?
-            command.add(String.valueOf(config.uniqueToken));
-
-            ProcessBuilder pb = new ProcessBuilder(command);
-            Process p = pb.start();
-
-            ByteArrayOutputStream baos = new ByteArrayOutputStream();
-
-            InputStreamDrainer errDrainer = new InputStreamDrainer(p.getErrorStream(), baos);
-            InputStreamDrainer outDrainer = new InputStreamDrainer(p.getInputStream(), baos);
-
-            errDrainer.start();
-            outDrainer.start();
-
-            int ecode = p.waitFor();
-
-            errDrainer.join();
-            outDrainer.join();
-
-            if (ecode != 0) {
-                // Test had failed, record this.
-                TestResult result = new TestResult(config, Status.VM_ERROR, -1);
-                result.addAuxData(new String(baos.toByteArray()).trim());
-                collector.add(result);
-            }
-        } catch (IOException | InterruptedException ex) {
-            ex.printStackTrace();
-        }
-    }
-
-    public void runEmbedded(TestConfig config, TestResultCollector collector) {
-        try {
-            Class<?> aClass = Class.forName(config.generatedRunnerName);
-            Constructor<?> cnstr = aClass.getConstructor(TestConfig.class, TestResultCollector.class, ExecutorService.class);
-            Runner<?> o = (Runner<?>) cnstr.newInstance(config, collector, pool);
-            o.run();
-        } catch (ClassFormatError e) {
-            TestResult result = new TestResult(config, Status.API_MISMATCH, 0);
-            result.addAuxData(e.getMessage());
-            collector.add(result);
-        } catch (Exception ex) {
-            TestResult result = new TestResult(config, Status.TEST_ERROR, 0);
-            result.addAuxData(ex.getMessage());
-            collector.add(result);
-        }
-    }
-
     public SortedSet<String> getTests() {
         String filter = opts.getTestFilter();
         SortedSet<String> s = new TreeSet<>();
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/jcstress-core/src/main/java/org/openjdk/jcstress/TestExecutor.java	Thu Oct 20 15:46:13 2016 +0200
@@ -0,0 +1,174 @@
+/*
+ * Copyright (c) 2014, 2015, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package org.openjdk.jcstress;
+
+import org.openjdk.jcstress.infra.Status;
+import org.openjdk.jcstress.infra.collectors.TestResult;
+import org.openjdk.jcstress.infra.collectors.TestResultCollector;
+import org.openjdk.jcstress.infra.runners.Runner;
+import org.openjdk.jcstress.infra.runners.TestConfig;
+import org.openjdk.jcstress.link.BinaryLinkServer;
+import org.openjdk.jcstress.util.InputStreamDrainer;
+import org.openjdk.jcstress.vm.VMSupport;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class TestExecutor {
+
+    private final ExecutorService pool;
+    private final Semaphore semaphore;
+    private final BinaryLinkServer server;
+    private final int maxThreads;
+    private final TestResultCollector sink;
+
+    public TestExecutor(int maxThreads, TestResultCollector sink, boolean possiblyForked) throws IOException {
+        this.maxThreads = maxThreads;
+        this.sink = sink;
+        this.pool = Executors.newCachedThreadPool(new ThreadFactory() {
+            private final AtomicInteger id = new AtomicInteger();
+
+            @Override
+            public Thread newThread(Runnable r) {
+                Thread t = new Thread(r);
+                t.setName("jcstress-worker-" + id.incrementAndGet());
+                t.setDaemon(true);
+                return t;
+            }
+        });
+
+        if (possiblyForked) {
+            semaphore = new Semaphore(maxThreads);
+            server = new BinaryLinkServer(maxThreads, sink);
+        } else {
+            semaphore = null;
+            server = null;
+        }
+    }
+
+    void runForked(TestConfig config) {
+        try {
+            List<String> command = new ArrayList<>();
+
+            // basic Java line
+            command.addAll(VMSupport.getJavaInvokeLine());
+
+            // jvm args
+            command.addAll(config.jvmArgs);
+
+            command.add(ForkedMain.class.getName());
+
+            command.add(server.getHost());
+            command.add(String.valueOf(server.getPort()));
+
+            // which config should the forked VM pull?
+            command.add(String.valueOf(config.uniqueToken));
+
+            ProcessBuilder pb = new ProcessBuilder(command);
+            Process p = pb.start();
+
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+            InputStreamDrainer errDrainer = new InputStreamDrainer(p.getErrorStream(), baos);
+            InputStreamDrainer outDrainer = new InputStreamDrainer(p.getInputStream(), baos);
+
+            errDrainer.start();
+            outDrainer.start();
+
+            int ecode = p.waitFor();
+
+            errDrainer.join();
+            outDrainer.join();
+
+            if (ecode != 0) {
+                // Test had failed, record this.
+                TestResult result = new TestResult(config, Status.VM_ERROR, -1);
+                result.addAuxData(new String(baos.toByteArray()).trim());
+                sink.add(result);
+            }
+        } catch (IOException | InterruptedException ex) {
+            ex.printStackTrace();
+        }
+    }
+
+    public void submit(TestConfig cfg) throws InterruptedException {
+        if (server == null || semaphore == null) {
+            throw new IllegalStateException("Embedded runner cannot accept tasks");
+        }
+
+        // Make fat tasks bypass in exclusive mode:
+        final int threads = Math.min(cfg.threads, maxThreads);
+        semaphore.acquire(threads);
+
+        pool.submit(() -> {
+            try {
+                switch (cfg.runMode) {
+                    case EMBEDDED:
+                        runEmbedded(cfg);
+                        break;
+                    case FORKED:
+                        server.addTask(cfg);
+                        runForked(cfg);
+                        break;
+                }
+            } finally {
+                semaphore.release(threads);
+            }
+        });
+    }
+
+    public void waitFinish() throws InterruptedException {
+        if (server == null || semaphore == null) {
+            throw new IllegalStateException("Embedded runner cannot accept tasks");
+        }
+        semaphore.acquire(maxThreads);
+        pool.shutdown();
+        pool.awaitTermination(1, TimeUnit.DAYS);
+        server.terminate();
+    }
+
+    public void runEmbedded(TestConfig config) {
+        try {
+            Class<?> aClass = Class.forName(config.generatedRunnerName);
+            Constructor<?> cnstr = aClass.getConstructor(TestConfig.class, TestResultCollector.class, ExecutorService.class);
+            Runner<?> o = (Runner<?>) cnstr.newInstance(config, sink, pool);
+            o.run();
+        } catch (ClassFormatError e) {
+            TestResult result = new TestResult(config, Status.API_MISMATCH, 0);
+            result.addAuxData(e.getMessage());
+            sink.add(result);
+        } catch (Exception ex) {
+            TestResult result = new TestResult(config, Status.TEST_ERROR, 0);
+            result.addAuxData(ex.getMessage());
+            sink.add(result);
+        }
+    }
+
+}
--- a/jcstress-core/src/main/java/org/openjdk/jcstress/infra/EndResult.java	Thu Oct 20 15:22:13 2016 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,30 +0,0 @@
-/*
- * Copyright (c) 2005, 2014, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.  Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-package org.openjdk.jcstress.infra;
-
-import java.io.Serializable;
-
-public class EndResult implements Serializable {
-}
--- a/jcstress-core/src/main/java/org/openjdk/jcstress/infra/Scheduler.java	Thu Oct 20 15:22:13 2016 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,77 +0,0 @@
-/*
- * Copyright (c) 2005, 2014, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.  Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-package org.openjdk.jcstress.infra;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Scheduler.
- *
- * @author Aleksey Shipilev (aleksey.shipilev@oracle.com)
- */
-public class Scheduler {
-
-    private final Semaphore sentinel;
-
-    private final ExecutorService services = Executors.newCachedThreadPool(r -> {
-        Thread t = new Thread(r);
-        t.setPriority(Thread.MAX_PRIORITY);
-        t.setDaemon(true);
-        return t;
-    });
-    private final int totalTokens;
-
-    public Scheduler(int totalTokens) {
-        this.totalTokens = totalTokens;
-        this.sentinel = new Semaphore(totalTokens);
-    }
-
-    public void schedule(final ScheduledTask task) throws InterruptedException {
-        // Make fat tasks bypass in exclusive mode
-        final int tokensAcquired = Math.min(task.getTokens(), totalTokens);
-        sentinel.acquire(tokensAcquired);
-        services.submit(() -> {
-            try {
-                task.run();
-            } finally {
-                sentinel.release(tokensAcquired);
-            }
-        });
-    }
-
-    public void waitFinish() throws InterruptedException {
-        services.shutdown();
-        services.awaitTermination(1, TimeUnit.DAYS);
-    }
-
-    public interface ScheduledTask extends Runnable {
-        int getTokens();
-    }
-
-}
--- a/jcstress-core/src/main/java/org/openjdk/jcstress/infra/runners/TestConfig.java	Thu Oct 20 15:22:13 2016 +0200
+++ b/jcstress-core/src/main/java/org/openjdk/jcstress/infra/runners/TestConfig.java	Thu Oct 20 15:46:13 2016 +0200
@@ -29,14 +29,11 @@
 import org.openjdk.jcstress.vm.AllocProfileSupport;
 
 import java.io.Serializable;
-import java.util.Comparator;
 import java.util.List;
 import java.util.function.Consumer;
 
 public class TestConfig implements Serializable {
 
-    public static final Comparator<TestConfig> COMPARATOR_NAME = Comparator.comparing((c) -> c.name);
-
     public final int uniqueToken;
     public final SpinLoopStyle spinLoopStyle;
     public final boolean verbose;