changeset 330:07d859cba98d

Runners: reuse JVMs with tunable batch size, use less threads.
author shade
date Fri, 21 Oct 2016 16:56:01 +0200
parents 9a849898ff24
children 1c029a6b4a20
files jcstress-core/src/main/java/org/openjdk/jcstress/EmbeddedExecutor.java jcstress-core/src/main/java/org/openjdk/jcstress/ForkFailedException.java jcstress-core/src/main/java/org/openjdk/jcstress/ForkedMain.java jcstress-core/src/main/java/org/openjdk/jcstress/JCStress.java jcstress-core/src/main/java/org/openjdk/jcstress/Options.java jcstress-core/src/main/java/org/openjdk/jcstress/TestExecutor.java jcstress-core/src/main/java/org/openjdk/jcstress/infra/runners/TestConfig.java jcstress-core/src/main/java/org/openjdk/jcstress/link/BinaryLinkClient.java jcstress-core/src/main/java/org/openjdk/jcstress/link/BinaryLinkServer.java jcstress-core/src/main/java/org/openjdk/jcstress/link/JobRequestFrame.java jcstress-core/src/main/java/org/openjdk/jcstress/link/ResultsFrame.java
diffstat 11 files changed, 418 insertions(+), 133 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/jcstress-core/src/main/java/org/openjdk/jcstress/EmbeddedExecutor.java	Fri Oct 21 16:56:01 2016 +0200
@@ -0,0 +1,95 @@
+/*
+ * Copyright (c) 2014, 2015, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package org.openjdk.jcstress;
+
+import org.openjdk.jcstress.infra.Status;
+import org.openjdk.jcstress.infra.collectors.TestResult;
+import org.openjdk.jcstress.infra.collectors.TestResultCollector;
+import org.openjdk.jcstress.infra.runners.Runner;
+import org.openjdk.jcstress.infra.runners.TestConfig;
+
+import java.lang.reflect.Constructor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+
+public class EmbeddedExecutor {
+
+    private final ExecutorService pool;
+    private final TestResultCollector sink;
+    private Consumer<TestConfig> onFinish;
+
+    public EmbeddedExecutor(TestResultCollector sink) {
+        this(sink, null);
+    }
+
+    public EmbeddedExecutor(TestResultCollector sink, Consumer<TestConfig> onFinish) {
+        this.sink = sink;
+        this.onFinish = onFinish;
+        pool = Executors.newCachedThreadPool(new ThreadFactory() {
+            private final AtomicInteger id = new AtomicInteger();
+
+            @Override
+            public Thread newThread(Runnable r) {
+                Thread t = new Thread(r);
+                t.setName("jcstress-worker-" + id.incrementAndGet());
+                t.setDaemon(true);
+                return t;
+            }
+        });
+    }
+
+    public void submit(TestConfig config) {
+        pool.submit(task(config));
+    }
+
+    public void run(TestConfig config) {
+        task(config).run();
+    }
+
+    private Runnable task(TestConfig config) {
+        return () -> {
+            try {
+                Class<?> aClass = Class.forName(config.generatedRunnerName);
+                Constructor<?> cnstr = aClass.getConstructor(TestConfig.class, TestResultCollector.class, ExecutorService.class);
+                Runner<?> o = (Runner<?>) cnstr.newInstance(config, sink, pool);
+                o.run();
+            } catch (ClassFormatError e) {
+                TestResult result = new TestResult(config, Status.API_MISMATCH, 0);
+                result.addAuxData(e.getMessage());
+                sink.add(result);
+            } catch (Exception ex) {
+                TestResult result = new TestResult(config, Status.TEST_ERROR, 0);
+                result.addAuxData(ex.getMessage());
+                sink.add(result);
+            }
+            if (onFinish != null) {
+                onFinish.accept(config);
+            }
+        };
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/jcstress-core/src/main/java/org/openjdk/jcstress/ForkFailedException.java	Fri Oct 21 16:56:01 2016 +0200
@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2014, 2015, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package org.openjdk.jcstress;
+
+import java.util.Collections;
+import java.util.List;
+
+public class ForkFailedException extends RuntimeException {
+    private List<String> info;
+
+    public ForkFailedException(String info) {
+        this.info = Collections.singletonList(info);
+    }
+
+    public ForkFailedException(List<String> info) {
+        this.info = info;
+    }
+
+    public List<String> getInfo() {
+        return info;
+    }
+}
--- a/jcstress-core/src/main/java/org/openjdk/jcstress/ForkedMain.java	Thu Oct 20 15:46:13 2016 +0200
+++ b/jcstress-core/src/main/java/org/openjdk/jcstress/ForkedMain.java	Fri Oct 21 16:56:01 2016 +0200
@@ -50,13 +50,17 @@
 
         String host = args[0];
         int port = Integer.valueOf(args[1]);
-        int token = Integer.valueOf(args[2]);
+        String token = args[2];
 
         BinaryLinkClient link = new BinaryLinkClient(host, port);
         Runtime.getRuntime().addShutdownHook(new CloseBinaryLinkHook(link));
 
-        TestConfig config = link.nextJob(token);
-        new TestExecutor(0, link, false).runEmbedded(config);
+        EmbeddedExecutor executor = new EmbeddedExecutor(result -> link.addResult(token, result));
+
+        TestConfig config;
+        while ((config = link.nextJob(token)) != null) {
+            executor.run(config);
+        }
     }
 
     /**
--- a/jcstress-core/src/main/java/org/openjdk/jcstress/JCStress.java	Thu Oct 20 15:46:13 2016 +0200
+++ b/jcstress-core/src/main/java/org/openjdk/jcstress/JCStress.java	Fri Oct 21 16:56:01 2016 +0200
@@ -66,11 +66,8 @@
         DiskWriteCollector diskCollector = new DiskWriteCollector(opts.getResultFile());
         TestResultCollector sink = MuxCollector.of(printer, diskCollector);
 
-        TestExecutor executor = new TestExecutor(opts.getUserCPUs(), sink, true);
-        for (TestConfig cfg : configs) {
-            executor.submit(cfg);
-        }
-        executor.waitFinish();
+        TestExecutor executor = new TestExecutor(opts.getUserCPUs(), opts.getBatchSize(), sink, true);
+        executor.runAll(configs);
 
         diskCollector.close();
 
@@ -100,8 +97,6 @@
     }
 
     private List<TestConfig> prepareRunProgram(Set<String> tests) {
-        int tokenCounter = 0;
-
         List<TestConfig> configs = new ArrayList<>();
         if (opts.shouldFork()) {
             List<String> inputArgs = ManagementFactory.getRuntimeMXBean().getInputArguments();
@@ -111,7 +106,7 @@
                     fullArgs.addAll(inputArgs);
                     fullArgs.addAll(jvmArgs);
                     for (int f = 0; f < opts.getForks(); f++) {
-                        configs.add(new TestConfig(tokenCounter++, opts, TestList.getInfo(test), TestConfig.RunMode.FORKED, f, fullArgs));
+                        configs.add(new TestConfig(opts, TestList.getInfo(test), TestConfig.RunMode.FORKED, f, fullArgs));
                     }
                 }
             }
@@ -119,7 +114,7 @@
             for (String test : tests) {
                 TestInfo info = TestList.getInfo(test);
                 TestConfig.RunMode mode = info.requiresFork() ? TestConfig.RunMode.FORKED : TestConfig.RunMode.EMBEDDED;
-                configs.add(new TestConfig(tokenCounter++, opts, info, mode, -1, Collections.emptyList()));
+                configs.add(new TestConfig(opts, info, mode, -1, Collections.emptyList()));
             }
         }
 
--- a/jcstress-core/src/main/java/org/openjdk/jcstress/Options.java	Thu Oct 20 15:46:13 2016 +0200
+++ b/jcstress-core/src/main/java/org/openjdk/jcstress/Options.java	Fri Oct 21 16:56:01 2016 +0200
@@ -63,6 +63,7 @@
     private String resultFile;
     private int deoptRatio;
     private Collection<String> jvmArgs;
+    private int batchSize;
 
     public Options(String[] args) {
         this.args = args;
@@ -111,6 +112,10 @@
                 "affects the stride size: maximum footprint will never be exceeded, regardless of min/max stride sizes.")
                 .withRequiredArg().ofType(Integer.class).describedAs("MB");
 
+        OptionSpec<Integer> batchSize = parser.accepts("bs", "Maximum number of tests to execute in a single VM. Larger " +
+                "values will improve test performance, at expense of testing accuracy")
+                .withRequiredArg().ofType(Integer.class).describedAs("#");
+
         OptionSpec<Boolean> shouldYield = parser.accepts("yield", "Call Thread.yield() in busy loops.")
                 .withOptionalArg().ofType(Boolean.class).describedAs("bool");
 
@@ -187,26 +192,31 @@
             this.time = 50;
             this.iters = 1;
             this.forks = 0;
+            this.batchSize = 20;
         } else
         if (this.mode.equalsIgnoreCase("quick")) {
             this.time = 300;
             this.iters = 5;
             this.forks = 1;
+            this.batchSize = 20;
         } else
         if (this.mode.equalsIgnoreCase("default")) {
             this.time = orDefault(set.valueOf(time), 1000);
             this.iters = orDefault(set.valueOf(iters), 5);
             this.forks = orDefault(set.valueOf(forks), 1);
+            this.batchSize = orDefault(set.valueOf(batchSize), 5);
         } else
         if (this.mode.equalsIgnoreCase("tough")) {
             this.time = 1000;
             this.iters = 10;
             this.forks = 10;
+            this.batchSize = 1;
         } else
         if (this.mode.equalsIgnoreCase("stress")) {
             this.time = 1000;
             this.iters = 50;
             this.forks = 10;
+            this.batchSize = 1;
         } else {
             System.err.println("Unknown test mode: " + this.mode);
             System.err.println();
@@ -246,6 +256,7 @@
         out.printf("  Writing the test results to \"%s\"\n", resultFile);
         out.printf("  Parsing results to \"%s\"\n", resultDir);
         out.printf("  Running each test matching \"%s\" for %d forks, %d iterations, %d ms each\n", getTestFilter(), getForks(), getIterations(), getTime());
+        out.printf("  Each JVM would execute at most %d tests in the row.\n", getBatchSize());
         out.printf("  Solo stride size will be autobalanced within [%d, %d] elements, but taking no more than %d Mb.\n", getMinStride(), getMaxStride(), getMaxFootprintMb());
 
         out.println();
@@ -326,4 +337,8 @@
     public int getMaxFootprintMb() {
         return maxFootprint;
     }
+
+    public int getBatchSize() {
+        return batchSize;
+    }
 }
--- a/jcstress-core/src/main/java/org/openjdk/jcstress/TestExecutor.java	Thu Oct 20 15:46:13 2016 +0200
+++ b/jcstress-core/src/main/java/org/openjdk/jcstress/TestExecutor.java	Fri Oct 21 16:56:01 2016 +0200
@@ -27,147 +27,257 @@
 import org.openjdk.jcstress.infra.Status;
 import org.openjdk.jcstress.infra.collectors.TestResult;
 import org.openjdk.jcstress.infra.collectors.TestResultCollector;
-import org.openjdk.jcstress.infra.runners.Runner;
 import org.openjdk.jcstress.infra.runners.TestConfig;
 import org.openjdk.jcstress.link.BinaryLinkServer;
-import org.openjdk.jcstress.util.InputStreamDrainer;
+import org.openjdk.jcstress.util.HashMultimap;
+import org.openjdk.jcstress.util.Multimap;
 import org.openjdk.jcstress.vm.VMSupport;
 
-import java.io.ByteArrayOutputStream;
+import java.io.File;
 import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.util.ArrayList;
-import java.util.List;
+import java.nio.file.Files;
+import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 
+/**
+ * Manages test execution for the entire run.
+ *
+ * This executor is deliberately single-threaded for two reasons:
+ *   a) Tests are heavily multithreaded and spawning new threads here may
+ *      deplete the thread budget sooner rather than later;
+ *   b) Dead-locks in scheduling logic are more visible without threads;
+ */
 public class TestExecutor {
 
-    private final ExecutorService pool;
+    private static final int SPIN_WAIT_DELAY_MS = 100;
+
+    static final AtomicInteger ID = new AtomicInteger();
+
     private final Semaphore semaphore;
     private final BinaryLinkServer server;
     private final int maxThreads;
+    private final int batchSize;
     private final TestResultCollector sink;
+    private final Multimap<BatchKey, TestConfig> tasks;
+    private final Set<VM> vms;
+    private final EmbeddedExecutor embeddedExecutor;
 
-    public TestExecutor(int maxThreads, TestResultCollector sink, boolean possiblyForked) throws IOException {
+    public TestExecutor(int maxThreads, int batchSize, TestResultCollector sink, boolean possiblyForked) throws IOException {
         this.maxThreads = maxThreads;
+        this.batchSize = batchSize;
         this.sink = sink;
-        this.pool = Executors.newCachedThreadPool(new ThreadFactory() {
-            private final AtomicInteger id = new AtomicInteger();
 
-            @Override
-            public Thread newThread(Runnable r) {
-                Thread t = new Thread(r);
-                t.setName("jcstress-worker-" + id.incrementAndGet());
-                t.setDaemon(true);
-                return t;
+        this.tasks = new HashMultimap<>();
+        this.vms = Collections.newSetFromMap(new ConcurrentHashMap<>());
+
+        semaphore = new Semaphore(maxThreads);
+        server = possiblyForked ? new BinaryLinkServer(maxThreads, sink) : null;
+        embeddedExecutor = new EmbeddedExecutor(sink, (cfg) -> semaphore.release(cfg.threads));
+    }
+
+    public void runAll(List<TestConfig> configs) throws InterruptedException {
+        for (TestConfig cfg : configs) {
+            switch (cfg.runMode) {
+                case EMBEDDED:
+                    waitForMoreThreads(cfg.threads);
+                    embeddedExecutor.submit(cfg);
+                    break;
+                case FORKED:
+                    BatchKey batchKey = BatchKey.getFrom(cfg);
+                    tasks.put(batchKey, cfg);
+
+                    Collection<TestConfig> curBatch = tasks.get(batchKey);
+                    if (curBatch.size() >= batchSize) {
+                        tasks.remove(batchKey);
+                        doSchedule(batchKey, curBatch);
+                    }
+                    break;
+                default:
+                    throw new IllegalStateException("Unknown mode: " + cfg.runMode);
             }
-        });
+        }
 
-        if (possiblyForked) {
-            semaphore = new Semaphore(maxThreads);
-            server = new BinaryLinkServer(maxThreads, sink);
-        } else {
-            semaphore = null;
-            server = null;
+        // Run down the remaining tasks
+        for (BatchKey key : tasks.keys()) {
+            Collection<TestConfig> curBatch = tasks.get(key);
+            if (!curBatch.isEmpty()) {
+                doSchedule(key, curBatch);
+            }
+        }
+
+        // Wait until all threads are done, which means everything got processed
+        waitForMoreThreads(maxThreads);
+
+        server.terminate();
+    }
+
+    private void doSchedule(BatchKey batchKey, Collection<TestConfig> configs)  {
+        // Make fat tasks bypass in exclusive mode:
+        final int threads = Math.min(batchKey.threads, maxThreads);
+        waitForMoreThreads(threads);
+        startVM(batchKey, configs);
+    }
+
+    private void waitForMoreThreads(int threads) {
+        while (!semaphore.tryAcquire(threads)) {
+            processReadyVMs();
+            try {
+                Thread.sleep(SPIN_WAIT_DELAY_MS);
+            } catch (InterruptedException e) {
+                // do nothing
+            }
         }
     }
 
-    void runForked(TestConfig config) {
-        try {
-            List<String> command = new ArrayList<>();
+    private void startVM(BatchKey batchKey, Collection<TestConfig> configs) {
+        String token = "fork-token-" + ID.incrementAndGet();
+        server.addTask(token, configs);
 
-            // basic Java line
-            command.addAll(VMSupport.getJavaInvokeLine());
+        VM vm = new VM(server.getHost(), server.getPort(), batchKey, token);
+        vms.add(vm);
+        vm.start();
+    }
 
-            // jvm args
-            command.addAll(config.jvmArgs);
+    private void stopVM(VM vm) {
+        vms.remove(vm);
+        semaphore.release(vm.key.threads);
+    }
 
-            command.add(ForkedMain.class.getName());
+    private void processReadyVMs() {
+        for (VM vm : vms) {
+            try {
+                if (vm.checkTermination()) {
+                    stopVM(vm);
+                }
+            } catch (ForkFailedException e) {
+                // Record the failure for the actual test
+                TestConfig failed = server.getCurrentTask(vm.token);
+                if (failed != null) {
+                    // TODO: Handle the VM bootup failure better, when failed == null
+                    TestResult result = new TestResult(failed, Status.VM_ERROR, -1);
+                    for (String i : e.getInfo()) {
+                        result.addAuxData(i);
+                    }
+                    sink.add(result);
+                }
 
-            command.add(server.getHost());
-            command.add(String.valueOf(server.getPort()));
+                stopVM(vm);
 
-            // which config should the forked VM pull?
-            command.add(String.valueOf(config.uniqueToken));
-
-            ProcessBuilder pb = new ProcessBuilder(command);
-            Process p = pb.start();
-
-            ByteArrayOutputStream baos = new ByteArrayOutputStream();
-
-            InputStreamDrainer errDrainer = new InputStreamDrainer(p.getErrorStream(), baos);
-            InputStreamDrainer outDrainer = new InputStreamDrainer(p.getInputStream(), baos);
-
-            errDrainer.start();
-            outDrainer.start();
-
-            int ecode = p.waitFor();
-
-            errDrainer.join();
-            outDrainer.join();
-
-            if (ecode != 0) {
-                // Test had failed, record this.
-                TestResult result = new TestResult(config, Status.VM_ERROR, -1);
-                result.addAuxData(new String(baos.toByteArray()).trim());
-                sink.add(result);
+                // Remaining tasks from the fork need to get back on queue
+                doSchedule(vm.key, server.removePendingTasks(vm.token));
             }
-        } catch (IOException | InterruptedException ex) {
-            ex.printStackTrace();
         }
     }
 
-    public void submit(TestConfig cfg) throws InterruptedException {
-        if (server == null || semaphore == null) {
-            throw new IllegalStateException("Embedded runner cannot accept tasks");
+    private static class VM {
+        private final String host;
+        private final int port;
+        private final BatchKey key;
+        private final String token;
+        private final File stdout;
+        private final File stderr;
+        private Process process;
+
+        public VM(String host, int port, BatchKey key, String token) {
+            this.host = host;
+            this.port = port;
+            this.key = key;
+            this.token = token;
+            try {
+                this.stdout = File.createTempFile("jcstress", "stdout");
+                this.stderr = File.createTempFile("jcstress", "stderr");
+            } catch (IOException e) {
+                throw new IllegalStateException(e);
+            }
         }
 
-        // Make fat tasks bypass in exclusive mode:
-        final int threads = Math.min(cfg.threads, maxThreads);
-        semaphore.acquire(threads);
+        void start() throws ForkFailedException {
+            try {
+                List<String> command = new ArrayList<>();
 
-        pool.submit(() -> {
-            try {
-                switch (cfg.runMode) {
-                    case EMBEDDED:
-                        runEmbedded(cfg);
-                        break;
-                    case FORKED:
-                        server.addTask(cfg);
-                        runForked(cfg);
-                        break;
+                // basic Java line
+                command.addAll(VMSupport.getJavaInvokeLine());
+
+                // jvm args
+                command.addAll(key.jvmArgs);
+
+                command.add(ForkedMain.class.getName());
+
+                command.add(host);
+                command.add(String.valueOf(port));
+
+                // which config should the forked VM pull?
+                command.add(token);
+
+                ProcessBuilder pb = new ProcessBuilder(command);
+                pb.redirectOutput(stdout);
+                pb.redirectError(stderr);
+                process = pb.start();
+            } catch (IOException ex) {
+                throw new ForkFailedException(ex.getMessage());
+            }
+        }
+
+        boolean checkTermination() {
+            if (process.isAlive()) {
+                return false;
+            } else {
+                // Try to poll the exit code, and fail if it's not zero.
+                try {
+                    int ecode = process.waitFor();
+                    if (ecode != 0) {
+                        List<String> output = new ArrayList<>();
+                        try {
+                            output.addAll(Files.readAllLines(stdout.toPath()));
+                        } catch (IOException e) {
+                            output.add("Failed to read stdout: " + e.getMessage());
+                        }
+                        try {
+                            output.addAll(Files.readAllLines(stderr.toPath()));
+                        } catch (IOException e) {
+                            output.add("Failed to read stderr: " + e.getMessage());
+                        }
+                        throw new ForkFailedException(output);
+                    }
+                } catch (InterruptedException ex) {
+                    throw new ForkFailedException(ex.getMessage());
                 }
-            } finally {
-                semaphore.release(threads);
+                return true;
             }
-        });
+        }
     }
 
-    public void waitFinish() throws InterruptedException {
-        if (server == null || semaphore == null) {
-            throw new IllegalStateException("Embedded runner cannot accept tasks");
+    static class BatchKey {
+        private int threads;
+        private List<String> jvmArgs;
+
+        BatchKey(int threads, List<String> jvmArgs) {
+            this.threads = threads;
+            this.jvmArgs = jvmArgs;
         }
-        semaphore.acquire(maxThreads);
-        pool.shutdown();
-        pool.awaitTermination(1, TimeUnit.DAYS);
-        server.terminate();
-    }
 
-    public void runEmbedded(TestConfig config) {
-        try {
-            Class<?> aClass = Class.forName(config.generatedRunnerName);
-            Constructor<?> cnstr = aClass.getConstructor(TestConfig.class, TestResultCollector.class, ExecutorService.class);
-            Runner<?> o = (Runner<?>) cnstr.newInstance(config, sink, pool);
-            o.run();
-        } catch (ClassFormatError e) {
-            TestResult result = new TestResult(config, Status.API_MISMATCH, 0);
-            result.addAuxData(e.getMessage());
-            sink.add(result);
-        } catch (Exception ex) {
-            TestResult result = new TestResult(config, Status.TEST_ERROR, 0);
-            result.addAuxData(ex.getMessage());
-            sink.add(result);
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+
+            BatchKey batchKey = (BatchKey) o;
+
+            if (threads != batchKey.threads) return false;
+            return jvmArgs.equals(batchKey.jvmArgs);
+
+        }
+
+        @Override
+        public int hashCode() {
+            int result = threads;
+            result = 31 * result + jvmArgs.hashCode();
+            return result;
+        }
+
+        static BatchKey getFrom(TestConfig cfg) {
+            return new BatchKey(cfg.threads, cfg.jvmArgs);
         }
     }
 
--- a/jcstress-core/src/main/java/org/openjdk/jcstress/infra/runners/TestConfig.java	Thu Oct 20 15:46:13 2016 +0200
+++ b/jcstress-core/src/main/java/org/openjdk/jcstress/infra/runners/TestConfig.java	Fri Oct 21 16:56:01 2016 +0200
@@ -34,7 +34,6 @@
 
 public class TestConfig implements Serializable {
 
-    public final int uniqueToken;
     public final SpinLoopStyle spinLoopStyle;
     public final boolean verbose;
     public final int time;
@@ -55,8 +54,7 @@
         FORKED,
     }
 
-    public TestConfig(int uniqueToken, Options opts, TestInfo info, RunMode runMode, int forkId, List<String> jvmArgs) {
-        this.uniqueToken = uniqueToken;
+    public TestConfig(Options opts, TestInfo info, RunMode runMode, int forkId, List<String> jvmArgs) {
         this.runMode = runMode;
         this.forkId = forkId;
         this.jvmArgs = jvmArgs;
--- a/jcstress-core/src/main/java/org/openjdk/jcstress/link/BinaryLinkClient.java	Thu Oct 20 15:46:13 2016 +0200
+++ b/jcstress-core/src/main/java/org/openjdk/jcstress/link/BinaryLinkClient.java	Fri Oct 21 16:56:01 2016 +0200
@@ -25,14 +25,13 @@
 package org.openjdk.jcstress.link;
 
 import org.openjdk.jcstress.infra.collectors.TestResult;
-import org.openjdk.jcstress.infra.collectors.TestResultCollector;
 import org.openjdk.jcstress.infra.runners.TestConfig;
 import org.openjdk.jcstress.util.FileUtils;
 
 import java.io.*;
 import java.net.Socket;
 
-public final class BinaryLinkClient implements TestResultCollector {
+public final class BinaryLinkClient {
 
     private static final int RESET_EACH = Integer.getInteger("jcstress.link.resetEach", 100);
     private static final int BUFFER_SIZE = Integer.getInteger("jcstress.link.bufferSize", 64*1024);
@@ -108,7 +107,7 @@
         }
     }
 
-    public TestConfig nextJob(int token) throws IOException, ClassNotFoundException {
+    public TestConfig nextJob(String token) throws IOException, ClassNotFoundException {
         synchronized (lock) {
             pushFrame(new JobRequestFrame(token));
 
@@ -121,10 +120,9 @@
         }
     }
 
-    @Override
-    public void add(TestResult result) {
+    public void addResult(String token, TestResult result) {
         try {
-            pushFrame(new ResultsFrame(result));
+            pushFrame(new ResultsFrame(token, result));
         } catch (IOException e) {
             throw new IllegalStateException(e);
         }
--- a/jcstress-core/src/main/java/org/openjdk/jcstress/link/BinaryLinkServer.java	Thu Oct 20 15:46:13 2016 +0200
+++ b/jcstress-core/src/main/java/org/openjdk/jcstress/link/BinaryLinkServer.java	Fri Oct 21 16:56:01 2016 +0200
@@ -31,8 +31,10 @@
 
 import java.io.*;
 import java.net.*;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.*;
 
 /**
@@ -50,13 +52,15 @@
     private final ServerSocket server;
     private final InetAddress listenAddress;
     private final TestResultCollector out;
-    private final ConcurrentMap<Integer, TestConfig> configs;
+    private final ConcurrentMap<String, List<TestConfig>> configs;
+    private final ConcurrentMap<String, TestConfig> currentTask;
     private final ExecutorService executor;
     private final Collection<Handler> outstandingHandlers;
 
     public BinaryLinkServer(int workers, TestResultCollector out) throws IOException {
         this.out = out;
         this.configs = new ConcurrentHashMap<>();
+        this.currentTask = new ConcurrentHashMap<>();
 
         listenAddress = getListenAddress();
         server = new ServerSocket(LINK_PORT, 50, listenAddress);
@@ -97,11 +101,20 @@
         }
     }
 
-    public void addTask(TestConfig cfg) {
-        configs.put(cfg.uniqueToken, cfg);
+    public void addTask(String token, Collection<TestConfig> cfgs) {
+        List<TestConfig> exist = configs.put(token, new ArrayList<>(cfgs));
+        if (exist != null) {
+            throw new IllegalStateException("Trying to overwrite the same token");
+        }
         executor.submit(new Handler(server));
     }
 
+    public List<TestConfig> removePendingTasks(String token) {
+        List<TestConfig> conf = configs.get(token);
+        configs.remove(token);
+        return conf;
+    }
+
     public String getHost() {
         return listenAddress.getHostAddress();
     }
@@ -111,6 +124,10 @@
         return server.getLocalPort();
     }
 
+    public TestConfig getCurrentTask(String token) {
+        return currentTask.get(token);
+    }
+
     private final class Handler implements Runnable {
         private final ServerSocket server;
         private Socket socket;
@@ -140,15 +157,21 @@
                 Object obj;
                 while ((obj = ois.readObject()) != null) {
                     if (obj instanceof JobRequestFrame) {
-                        config = configs.remove(((JobRequestFrame) obj).getToken());
-                        if (config == null) {
-                            throw new IllegalStateException("No jobs left, this should not happen");
+                        String tkn = ((JobRequestFrame) obj).getToken();
+                        List<TestConfig> cfgs = configs.get(tkn);
+                        if (cfgs.isEmpty()) {
+                            oos.writeObject(new JobResponseFrame(null));
+                        } else {
+                            config = cfgs.remove(0);
+                            currentTask.put(tkn, config);
+                            oos.writeObject(new JobResponseFrame(config));
                         }
-                        oos.writeObject(new JobResponseFrame(config));
                         oos.flush();
                     }
                     if (obj instanceof ResultsFrame) {
-                        out.add(((ResultsFrame) obj).getRes());
+                        ResultsFrame rf = (ResultsFrame) obj;
+                        out.add(rf.getRes());
+                        currentTask.remove(rf.getToken());
                     }
                     if (obj instanceof FinishingFrame) {
                         // close the streams
--- a/jcstress-core/src/main/java/org/openjdk/jcstress/link/JobRequestFrame.java	Thu Oct 20 15:46:13 2016 +0200
+++ b/jcstress-core/src/main/java/org/openjdk/jcstress/link/JobRequestFrame.java	Fri Oct 21 16:56:01 2016 +0200
@@ -28,13 +28,13 @@
 
 class JobRequestFrame implements Serializable {
     private static final long serialVersionUID = 2082214387637725282L;
-    private int token;
+    private String token;
 
-    public JobRequestFrame(int token) {
+    public JobRequestFrame(String token) {
         this.token = token;
     }
 
-    public int getToken() {
+    public String getToken() {
         return token;
     }
 }
--- a/jcstress-core/src/main/java/org/openjdk/jcstress/link/ResultsFrame.java	Thu Oct 20 15:46:13 2016 +0200
+++ b/jcstress-core/src/main/java/org/openjdk/jcstress/link/ResultsFrame.java	Fri Oct 21 16:56:01 2016 +0200
@@ -31,12 +31,15 @@
 class ResultsFrame implements Serializable {
     private static final long serialVersionUID = -5627086531281515824L;
 
+    private final String token;
     private final TestResult res;
 
-    public ResultsFrame(TestResult res) {
+    public ResultsFrame(String token, TestResult res) {
+        this.token = token;
         this.res = res;
     }
 
+    public String getToken() { return token; }
     public TestResult getRes() {
         return res;
     }