changeset 911:5070b7fd46e7

link: synchronize accesses to binary link clients. Helps to maintain correctness when several threads are writing to stdout/stderr.
author shade
date Thu, 17 Jul 2014 16:10:18 +0400
parents f46c7991c62f
children 01fe6ebab29e
files jmh-core/src/main/java/org/openjdk/jmh/runner/link/BinaryLinkClient.java
diffstat 1 files changed, 58 insertions(+), 52 deletions(-) [+]
line wrap: on
line diff
--- a/jmh-core/src/main/java/org/openjdk/jmh/runner/link/BinaryLinkClient.java	Wed Jul 16 22:29:25 2014 +0400
+++ b/jmh-core/src/main/java/org/openjdk/jmh/runner/link/BinaryLinkClient.java	Thu Jul 17 16:10:18 2014 +0400
@@ -38,6 +38,7 @@
 import java.io.ObjectOutputStream;
 import java.io.OutputStream;
 import java.io.PrintStream;
+import java.io.Serializable;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
@@ -46,14 +47,16 @@
 
 public final class BinaryLinkClient {
 
+    private final Object lock;
+
     private final Socket clientSocket;
-
     private final ObjectOutputStream oos;
     private final ObjectInputStream ois;
-    private ForwardingPrintStream streamErr;
-    private ForwardingPrintStream streamOut;
+    private final ForwardingPrintStream streamErr;
+    private final ForwardingPrintStream streamOut;
 
     public BinaryLinkClient(String hostName, int hostPort) throws IOException {
+        this.lock = new Object();
         this.clientSocket = new Socket(hostName, hostPort);
         this.oos = new ObjectOutputStream(clientSocket.getOutputStream());
         this.ois = new ObjectInputStream(clientSocket.getInputStream());
@@ -61,70 +64,76 @@
         this.streamOut = new ForwardingPrintStream(OutputFrame.Type.OUT);
     }
 
-    public Options requestOptions() throws IOException, ClassNotFoundException {
-        oos.writeObject(new InfraFrame(InfraFrame.Type.OPTIONS_REQUEST));
-        Object reply = ois.readObject();
-        if (reply instanceof OptionsFrame) {
-            return (((OptionsFrame) reply).getOpts());
-        } else {
-            throw new IllegalStateException("Got the erroneous reply: " + reply);
+    private void pushFrame(Serializable frame) throws IOException {
+        synchronized (lock) {
+            oos.writeObject(frame);
+            oos.flush();
         }
     }
 
-    public InvocationHandler getOutputFormatHandler() {
-        return new InvocationHandler() {
-            @Override
-            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
-                oos.writeObject(new OutputFormatFrame(ClassConventions.getMethodName(method), args));
-                oos.flush();
-                return null; // expect null
-            }
-        };
+    public void close() throws IOException {
+        synchronized (lock) {
+            oos.writeObject(new FinishingFrame());
+            FileUtils.safelyClose(oos);
+            FileUtils.safelyClose(streamErr);
+            FileUtils.safelyClose(streamOut);
+            clientSocket.close();
+        }
     }
 
-    public void close() throws IOException {
-        oos.writeObject(new FinishingFrame());
-        FileUtils.safelyClose(oos);
-        FileUtils.safelyClose(streamErr);
-        FileUtils.safelyClose(streamOut);
-        clientSocket.close();
+    public Options requestOptions() throws IOException, ClassNotFoundException {
+        synchronized (lock) {
+            pushFrame(new InfraFrame(InfraFrame.Type.OPTIONS_REQUEST));
+
+            Object reply = ois.readObject();
+            if (reply instanceof OptionsFrame) {
+                return (((OptionsFrame) reply).getOpts());
+            } else {
+                throw new IllegalStateException("Got the erroneous reply: " + reply);
+            }
+        }
+    }
+
+    public ActionPlan requestPlan() throws IOException, ClassNotFoundException {
+        synchronized (lock) {
+            pushFrame(new InfraFrame(InfraFrame.Type.ACTION_PLAN_REQUEST));
+
+            Object reply = ois.readObject();
+            if (reply instanceof ActionPlanFrame) {
+                return ((ActionPlanFrame) reply).getActionPlan();
+            } else {
+                throw new IllegalStateException("Got the erroneous reply: " + reply);
+            }
+        }
     }
 
     public void pushResults(Multimap<BenchmarkParams, BenchmarkResult> res) throws IOException {
-        oos.writeObject(new ResultsFrame(res));
-        oos.flush();
-    }
-
-    public ActionPlan requestPlan() throws IOException, ClassNotFoundException {
-        oos.writeObject(new InfraFrame(InfraFrame.Type.ACTION_PLAN_REQUEST));
-        oos.flush();
-
-        Object reply = ois.readObject();
-        if (reply instanceof ActionPlanFrame) {
-            return ((ActionPlanFrame) reply).getActionPlan();
-        } else {
-            throw new IllegalStateException("Got the erroneous reply: " + reply);
-        }
+        pushFrame(new ResultsFrame(res));
     }
 
     public void pushException(BenchmarkException error) throws IOException {
-        oos.writeObject(new ExceptionFrame(error));
-        oos.flush();
+        pushFrame(new ExceptionFrame(error));
+    }
+
+    public PrintStream getOutStream() {
+        return streamOut;
     }
 
     public PrintStream getErrStream() {
         return streamErr;
     }
 
-    public PrintStream getOutStream() {
-        return streamOut;
-    }
-
     public OutputFormat getOutputFormatHook() {
         return (OutputFormat) Proxy.newProxyInstance(
                 Thread.currentThread().getContextClassLoader(),
                 new Class[]{OutputFormat.class},
-                getOutputFormatHandler()
+                new InvocationHandler() {
+                    @Override
+                    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+                        pushFrame(new OutputFormatFrame(ClassConventions.getMethodName(method), args));
+                        return null; // expect null
+                    }
+                }
         );
     }
 
@@ -133,20 +142,17 @@
             super(new OutputStream() {
                 @Override
                 public void write(int b) throws IOException {
-                    oos.writeObject(new OutputFrame(type, new byte[]{(byte) (b & 0xFF)}));
-                    oos.flush();
+                    pushFrame(new OutputFrame(type, new byte[]{(byte) (b & 0xFF)}));
                 }
 
                 @Override
                 public void write(byte[] b) throws IOException {
-                    oos.writeObject(new OutputFrame(type, Arrays.copyOf(b, b.length)));
-                    oos.flush();
+                    pushFrame(new OutputFrame(type, Arrays.copyOf(b, b.length)));
                 }
 
                 @Override
                 public void write(byte[] b, int off, int len) throws IOException {
-                    oos.writeObject(new OutputFrame(type, Arrays.copyOfRange(b, off, len + off)));
-                    oos.flush();
+                    pushFrame(new OutputFrame(type, Arrays.copyOfRange(b, off, len + off)));
                 }
             });
         }