changeset 323:62580530cd03

7901807: Deadlock while trying to remove Handler during termination Summary: rewrite BinaryLinkServer to avoid synchronized collections.
author shade
date Wed, 28 Sep 2016 11:48:05 +0200
parents 0eecf3f89210
children 11b5f55dcddc
files jcstress-core/src/main/java/org/openjdk/jcstress/JCStress.java jcstress-core/src/main/java/org/openjdk/jcstress/link/BinaryLinkClient.java jcstress-core/src/main/java/org/openjdk/jcstress/link/BinaryLinkServer.java
diffstat 3 files changed, 70 insertions(+), 127 deletions(-) [+]
line wrap: on
line diff
--- a/jcstress-core/src/main/java/org/openjdk/jcstress/JCStress.java	Fri Sep 23 15:56:58 2016 +0200
+++ b/jcstress-core/src/main/java/org/openjdk/jcstress/JCStress.java	Wed Sep 28 11:48:05 2016 +0200
@@ -116,7 +116,7 @@
         DiskWriteCollector diskCollector = new DiskWriteCollector(opts.getResultFile());
         TestResultCollector sink = MuxCollector.of(printer, diskCollector);
 
-        BinaryLinkServer server = new BinaryLinkServer(sink);
+        BinaryLinkServer server = new BinaryLinkServer(opts.getUserCPUs(), sink);
 
         Scheduler scheduler = new Scheduler(opts.getUserCPUs());
         for (TestConfig cfg : configs) {
--- a/jcstress-core/src/main/java/org/openjdk/jcstress/link/BinaryLinkClient.java	Fri Sep 23 15:56:58 2016 +0200
+++ b/jcstress-core/src/main/java/org/openjdk/jcstress/link/BinaryLinkClient.java	Wed Sep 28 11:48:05 2016 +0200
@@ -36,6 +36,7 @@
 
     private static final int RESET_EACH = Integer.getInteger("jcstress.link.resetEach", 100);
     private static final int BUFFER_SIZE = Integer.getInteger("jcstress.link.bufferSize", 64*1024);
+    private static final int LINK_TIMEOUT_MS = Integer.getInteger("jcstress.link.timeoutMs", 30*1000);
 
     private final Object lock;
 
@@ -48,6 +49,7 @@
     public BinaryLinkClient(String hostName, int hostPort) throws IOException {
         this.lock = new Object();
         this.clientSocket = new Socket(hostName, hostPort);
+        clientSocket.setSoTimeout(LINK_TIMEOUT_MS);
 
         // Initialize the OOS first, and flush, letting the other party read the stream header.
         this.oos = new ObjectOutputStream(new BufferedOutputStream(clientSocket.getOutputStream(), BUFFER_SIZE));
--- a/jcstress-core/src/main/java/org/openjdk/jcstress/link/BinaryLinkServer.java	Fri Sep 23 15:56:58 2016 +0200
+++ b/jcstress-core/src/main/java/org/openjdk/jcstress/link/BinaryLinkServer.java	Wed Sep 28 11:48:05 2016 +0200
@@ -24,16 +24,15 @@
  */
 package org.openjdk.jcstress.link;
 
+import org.openjdk.jcstress.infra.Status;
+import org.openjdk.jcstress.infra.collectors.TestResult;
 import org.openjdk.jcstress.infra.collectors.TestResultCollector;
 import org.openjdk.jcstress.infra.runners.TestConfig;
 
 import java.io.*;
 import java.net.*;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.List;
+import java.util.concurrent.*;
 
 /**
  * Accepts the binary data from the forked VM and pushes it to parent VM
@@ -43,50 +42,31 @@
 public final class BinaryLinkServer {
 
     private static final int BUFFER_SIZE = Integer.getInteger("jcstress.link.bufferSize", 64*1024);
+    private static final String LINK_ADDRESS = System.getProperty("jcstress.link.address");
+    private static final int LINK_PORT = Integer.getInteger("jcstress.link.port", 0);
+    private static final int LINK_TIMEOUT_MS = Integer.getInteger("jcstress.link.timeoutMs", 30*1000);
 
-    private final Acceptor acceptor;
-    private final Collection<Handler> handlers;
+    private final ServerSocket server;
+    private final InetAddress listenAddress;
     private final TestResultCollector out;
     private final ConcurrentMap<Integer, TestConfig> configs;
+    private final ExecutorService executor;
 
-    public BinaryLinkServer(TestResultCollector out) throws IOException {
+    public BinaryLinkServer(int workers, TestResultCollector out) throws IOException {
         this.out = out;
         this.configs = new ConcurrentHashMap<>();
-        acceptor = new Acceptor();
-        acceptor.start();
 
-        handlers = Collections.synchronizedCollection(new ArrayList<>());
-    }
-
-    public void terminate() {
-        acceptor.close();
-
-        synchronized (handlers) {
-            for (Handler h : handlers) {
-                h.close();
-            }
-        }
-
-        try {
-            acceptor.join();
-            synchronized (handlers) {
-                for (Handler h : handlers) {
-                    h.join();
-                }
-            }
-        } catch (InterruptedException e) {
-            // ignore
-        }
-
-        handlers.clear();
+        listenAddress = getListenAddress();
+        server = new ServerSocket(LINK_PORT, 50, listenAddress);
+        server.setSoTimeout(LINK_TIMEOUT_MS);
+        executor = Executors.newFixedThreadPool(workers);
     }
 
     private InetAddress getListenAddress() {
         // Try to use user-provided override first.
-        String addr = System.getProperty("jcstress.link.address");
-        if (addr != null) {
+        if (LINK_ADDRESS != null) {
             try {
-                return InetAddress.getByName(addr);
+                return InetAddress.getByName(LINK_ADDRESS);
             } catch (UnknownHostException e) {
                 // override failed, notify user
                 throw new IllegalStateException("Can not initialize binary link.", e);
@@ -96,99 +76,70 @@
         return InetAddress.getLoopbackAddress();
     }
 
-    private int getListenPort() {
-        return Integer.getInteger("jmh.link.port", 0);
+    public void terminate() {
+        try {
+            server.close();
+        } catch (IOException e) {
+            // do nothing
+        }
+
+        List<Runnable> outstanding = executor.shutdownNow();
+        for (Runnable r : outstanding) {
+            Handler h = (Handler) r;
+            h.close();
+        }
     }
 
     public void addTask(TestConfig cfg) {
         configs.put(cfg.uniqueToken, cfg);
+        executor.submit(new Handler(server));
     }
 
-    private final class Acceptor extends Thread {
+    public String getHost() {
+        return listenAddress.getHostAddress();
+    }
 
+    public int getPort() {
+        // Poll the actual listen port, in case it is ephemeral
+        return server.getLocalPort();
+    }
+
+    private final class Handler implements Runnable {
         private final ServerSocket server;
-        private final InetAddress listenAddress;
+        private Socket socket;
 
-        public Acceptor() throws IOException {
-            listenAddress = getListenAddress();
-            server = new ServerSocket(getListenPort(), 50, listenAddress);
+        public Handler(ServerSocket server) {
+            this.server = server;
         }
 
         @Override
         public void run() {
+            TestConfig config = null;
             try {
-                while (!Thread.interrupted()) {
-                    Socket clientSocket = server.accept();
-                    Handler r = new Handler(clientSocket);
-                    handlers.add(r);
-                    r.start();
-                }
-            } catch (SocketException e) {
-                // assume this is "Socket closed", return
-            } catch (IOException e) {
-                throw new IllegalStateException(e);
-            } finally {
-                close();
-            }
-        }
+                socket = server.accept();
 
-        public String getHost() {
-            return listenAddress.getHostAddress();
-        }
+                InputStream is = socket.getInputStream();
+                OutputStream os = socket.getOutputStream();
 
-        public int getPort() {
-            // Poll the actual listen port, in case it is ephemeral
-            return server.getLocalPort();
-        }
+                // eager OOS initialization, let the other party read the stream header
+                ObjectOutputStream oos = new ObjectOutputStream(new BufferedOutputStream(os, BUFFER_SIZE));
+                oos.flush();
 
-        public void close() {
-            try {
-                server.close();
-            } catch (IOException e) {
-                // do nothing
-            }
-        }
-    }
-
-    public String getHost() {
-        return acceptor.getHost();
-    }
-
-    public int getPort() {
-        return acceptor.getPort();
-    }
-
-    private final class Handler extends Thread {
-        private final InputStream is;
-        private final Socket socket;
-        private ObjectInputStream ois;
-        private final OutputStream os;
-        private ObjectOutputStream oos;
-
-        public Handler(Socket socket) throws IOException {
-            this.socket = socket;
-            this.is = socket.getInputStream();
-            this.os = socket.getOutputStream();
-
-            // eager OOS initialization, let the other party read the stream header
-            oos = new ObjectOutputStream(new BufferedOutputStream(os, BUFFER_SIZE));
-            oos.flush();
-        }
-
-        @Override
-        public void run() {
-            try {
                 // late OIS initialization, otherwise we'll block reading the header
-                ois = new ObjectInputStream(new BufferedInputStream(is, BUFFER_SIZE));
+                ObjectInputStream ois = new ObjectInputStream(new BufferedInputStream(is, BUFFER_SIZE));
 
                 Object obj;
                 while ((obj = ois.readObject()) != null) {
-
                     if (obj instanceof JobRequestFrame) {
-                        handleHandshake((JobRequestFrame) obj);
+                        config = configs.remove(((JobRequestFrame) obj).getToken());
+                        if (config == null) {
+                            throw new IllegalStateException("No jobs left, this should not happen");
+                        }
+                        oos.writeObject(new JobResponseFrame(config));
+                        oos.flush();
                     }
                     if (obj instanceof ResultsFrame) {
-                        handleResults((ResultsFrame) obj);
+                        out.add(((ResultsFrame) obj).getRes());
                     }
                     if (obj instanceof FinishingFrame) {
                         // close the streams
@@ -198,32 +149,22 @@
             } catch (EOFException e) {
                 // ignore
             } catch (Exception e) {
-                System.out.println("<binary link had failed, forked VM corrupted the stream?");
-                e.printStackTrace(System.out);
+                TestResult tr = new TestResult(config, Status.VM_ERROR, -1);
+                tr.addAuxData("<binary link had failed, forked VM corrupted the stream?");
+                tr.addAuxData(e.getMessage());
+                out.add(tr);
             } finally {
                 close();
-                handlers.remove(this);
             }
         }
 
-        private void handleResults(ResultsFrame obj) {
-            out.add(obj.getRes());
-        }
-
-        private void handleHandshake(JobRequestFrame obj) throws IOException {
-            TestConfig poll = configs.remove(obj.getToken());
-            if (poll == null) {
-                throw new IllegalStateException("No jobs left, this should not happen");
-            }
-            oos.writeObject(new JobResponseFrame(poll));
-            oos.flush();
-        }
-
         public void close() {
-            try {
-                socket.close();
-            } catch (IOException e) {
-                // ignore
+            if (socket != null) {
+                try {
+                    socket.close();
+                } catch (IOException e) {
+                    // ignore
+                }
             }
         }