changeset 17744:5fcfc9e09966

8187044: HttpClient ConnectionPool may spawn several concurrent CacheCleaner and prevent early GC of HttpClient. Summary: Fixes CacheCleaner creation logic in ConnectionPool. Reviewed-by: chegar
author dfuchs
date Fri, 01 Sep 2017 18:18:09 +0100
parents 4846f1bc6d2b
children f59720adabf8
files src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ConnectionPool.java test/java/net/httpclient/whitebox/Driver.java test/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/ConnectionPoolTest.java
diffstat 3 files changed, 331 insertions(+), 20 deletions(-) [+]
line wrap: on
line diff
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ConnectionPool.java	Fri Sep 01 08:15:52 2017 -0700
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ConnectionPool.java	Fri Sep 01 18:18:09 2017 +0100
@@ -25,11 +25,14 @@
 
 package jdk.incubator.http;
 
+import java.lang.ref.WeakReference;
 import java.net.InetSocketAddress;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.ListIterator;
 import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import jdk.incubator.http.internal.common.Utils;
 
 /**
@@ -37,6 +40,21 @@
  */
 final class ConnectionPool {
 
+    // These counters are used to distribute ids for debugging
+    // The ACTIVE_CLEANER_COUNTER will tell how many CacheCleaner
+    // are active at a given time. It will increase when a new
+    // CacheCleaner is started and decrease when it exits.
+    static final AtomicLong ACTIVE_CLEANER_COUNTER = new AtomicLong();
+    // The POOL_IDS_COUNTER increases each time a new ConnectionPool
+    // is created. It may wrap and become negative but will never be
+    // decremented.
+    static final AtomicLong POOL_IDS_COUNTER = new AtomicLong();
+    // The cleanerCounter is used to name cleaner threads within a
+    // a connection pool, and increments monotically.
+    // It may wrap and become negative but will never be
+    // decremented.
+    final AtomicLong cleanerCounter = new AtomicLong();
+
     static final long KEEP_ALIVE = Utils.getIntegerNetProperty(
             "jdk.httpclient.keepalive.timeout", 1200); // seconds
 
@@ -44,7 +62,12 @@
 
     final HashMap<CacheKey,LinkedList<HttpConnection>> plainPool;
     final HashMap<CacheKey,LinkedList<HttpConnection>> sslPool;
-    CacheCleaner cleaner;
+    // A monotically increasing id for this connection pool.
+    // It may be negative (that's OK)
+    // Mostly used for debugging purposes when looking at thread dumps.
+    // Global scope.
+    final long poolID = POOL_IDS_COUNTER.incrementAndGet();
+    final AtomicReference<CacheCleaner> cleanerRef;
 
     /**
      * Entries in connection pool are keyed by destination address and/or
@@ -105,6 +128,7 @@
         plainPool = new HashMap<>();
         sslPool = new HashMap<>();
         expiryList = new LinkedList<>();
+        cleanerRef = new AtomicReference<>();
     }
 
     void start() {
@@ -143,7 +167,7 @@
     findConnection(CacheKey key,
                    HashMap<CacheKey,LinkedList<HttpConnection>> pool) {
         LinkedList<HttpConnection> l = pool.get(key);
-        if (l == null || l.size() == 0) {
+        if (l == null || l.isEmpty()) {
             return null;
         } else {
             HttpConnection c = l.removeFirst();
@@ -175,19 +199,36 @@
         l.add(c);
     }
 
+    static String makeCleanerName(long poolId, long cleanerId) {
+        return "HTTP-Cache-cleaner-" + poolId + "-" + cleanerId;
+    }
+
     // only runs while entries exist in cache
-
-    final class CacheCleaner extends Thread {
+    final static class CacheCleaner extends Thread {
 
         volatile boolean stopping;
+        // A monotically increasing id. May wrap and become negative (that's OK)
+        // Mostly used for debugging purposes when looking at thread dumps.
+        // Scoped per connection pool.
+        final long cleanerID;
+        // A reference to the owning ConnectionPool.
+        // This reference's referent may become null if the HttpClientImpl
+        // that owns this pool is GC'ed.
+        final WeakReference<ConnectionPool> ownerRef;
 
-        CacheCleaner() {
-            super(null, null, "HTTP-Cache-cleaner", 0, false);
+        CacheCleaner(ConnectionPool owner) {
+            this(owner, owner.cleanerCounter.incrementAndGet());
+        }
+
+        CacheCleaner(ConnectionPool owner, long cleanerID) {
+            super(null, null, makeCleanerName(owner.poolID, cleanerID), 0, false);
+            this.cleanerID = cleanerID;
+            this.ownerRef = new WeakReference<>(owner);
             setDaemon(true);
         }
 
         synchronized boolean stopping() {
-            return stopping;
+            return stopping || ownerRef.get() == null;
         }
 
         synchronized void stopCleaner() {
@@ -196,11 +237,19 @@
 
         @Override
         public void run() {
-            while (!stopping()) {
-                try {
-                    Thread.sleep(3000);
-                } catch (InterruptedException e) {}
-                cleanCache();
+            ACTIVE_CLEANER_COUNTER.incrementAndGet();
+            try {
+                while (!stopping()) {
+                    try {
+                        Thread.sleep(3000);
+                    } catch (InterruptedException e) {}
+                    ConnectionPool owner = ownerRef.get();
+                    if (owner == null) return;
+                    owner.cleanCache(this);
+                    owner = null;
+                }
+            } finally {
+                ACTIVE_CLEANER_COUNTER.decrementAndGet();
             }
         }
     }
@@ -217,13 +266,15 @@
                 return;
             }
         }
-        if (expiryList.isEmpty()) {
+        CacheCleaner cleaner = this.cleanerRef.get();
+        if (expiryList.isEmpty() && cleaner != null) {
+            this.cleanerRef.compareAndSet(cleaner, null);
             cleaner.stopCleaner();
-            cleaner = null;
+            cleaner.interrupt();
         }
     }
 
-    private void cleanCache() {
+    private void cleanCache(CacheCleaner cleaner) {
         long now = System.currentTimeMillis() / 1000;
         LinkedList<HttpConnection> closelist = new LinkedList<>();
 
@@ -242,6 +293,10 @@
                     }
                 }
             }
+            if (expiryList.isEmpty() && cleaner != null) {
+                this.cleanerRef.compareAndSet(cleaner, null);
+                cleaner.stopCleaner();
+            }
         }
         for (HttpConnection c : closelist) {
             //System.out.println ("KAC: closing " + c);
@@ -252,10 +307,13 @@
     private synchronized void addToExpiryList(HttpConnection conn) {
         long now = System.currentTimeMillis() / 1000;
         long then = now + KEEP_ALIVE;
-
         if (expiryList.isEmpty()) {
-            cleaner = new CacheCleaner();
-            cleaner.start();
+            CacheCleaner cleaner = new CacheCleaner(this);
+            if (this.cleanerRef.compareAndSet(null, cleaner)) {
+                cleaner.start();
+            }
+            expiryList.add(new ExpiryEntry(conn, then));
+            return;
         }
 
         ListIterator<ExpiryEntry> li = expiryList.listIterator();
--- a/test/java/net/httpclient/whitebox/Driver.java	Fri Sep 01 08:15:52 2017 -0700
+++ b/test/java/net/httpclient/whitebox/Driver.java	Fri Sep 01 18:18:09 2017 +0100
@@ -23,9 +23,10 @@
 
 /*
  * @test
- * @bug 8151299 8164704
- * @modules jdk.incubator.httpclient
+ * @bug 8151299 8164704 8187044
+ * @modules jdk.incubator.httpclient java.management
  * @run testng jdk.incubator.httpclient/jdk.incubator.http.SelectorTest
  * @run testng jdk.incubator.httpclient/jdk.incubator.http.RawChannelTest
  * @run testng jdk.incubator.httpclient/jdk.incubator.http.ResponseHeadersTest
+ * @run main/othervm --add-reads jdk.incubator.httpclient=java.management jdk.incubator.httpclient/jdk.incubator.http.ConnectionPoolTest
  */
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/ConnectionPoolTest.java	Fri Sep 01 18:18:09 2017 +0100
@@ -0,0 +1,252 @@
+/*
+ * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+package jdk.incubator.http;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.ref.Reference;
+import java.lang.ref.ReferenceQueue;
+import java.lang.ref.WeakReference;
+import java.net.Authenticator;
+import java.net.CookieManager;
+import java.net.InetSocketAddress;
+import java.net.ProxySelector;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLParameters;
+import jdk.incubator.http.internal.common.ByteBufferReference;
+
+/**
+ * @summary Verifies that the ConnectionPool won't prevent an HttpClient
+ *          from being GC'ed. Verifies that the ConnectionPool has at most
+ *          one CacheCleaner thread running.
+ * @bug 8187044
+ * @author danielfuchs
+ */
+public class ConnectionPoolTest {
+
+    static long getActiveCleaners() throws ClassNotFoundException {
+        // ConnectionPool.ACTIVE_CLEANER_COUNTER.get()
+        // ConnectionPoolTest.class.getModule().addReads(
+        //      Class.forName("java.lang.management.ManagementFactory").getModule());
+        return java.util.stream.Stream.of(ManagementFactory.getThreadMXBean()
+                .dumpAllThreads(false, false))
+              .filter(t -> t.getThreadName().startsWith("HTTP-Cache-cleaner"))
+              .count();
+    }
+
+    public static void main(String[] args) throws Exception {
+        testCacheCleaners();
+    }
+
+    public static void testCacheCleaners() throws Exception {
+        ConnectionPool pool = new ConnectionPool();
+        HttpClient client = new HttpClientStub(pool);
+        InetSocketAddress proxy = InetSocketAddress.createUnresolved("bar", 80);
+        System.out.println("Adding 10 connections to pool");
+        for (int i=0; i<10; i++) {
+            InetSocketAddress addr = InetSocketAddress.createUnresolved("foo"+i, 80);
+            HttpConnection c1 = new HttpConnectionStub(client, addr, proxy, true);
+            pool.returnToPool(c1);
+        }
+        while (getActiveCleaners() == 0) {
+            System.out.println("Waiting for cleaner to start");
+            Thread.sleep(10);
+        }
+        System.out.println("Active CacheCleaners: " + getActiveCleaners());
+        if (getActiveCleaners() > 1) {
+            throw new RuntimeException("Too many CacheCleaner active: "
+                    + getActiveCleaners());
+        }
+        System.out.println("Removing 9 connections from pool");
+        for (int i=0; i<9; i++) {
+            InetSocketAddress addr = InetSocketAddress.createUnresolved("foo"+i, 80);
+            HttpConnection c2 = pool.getConnection(true, addr, proxy);
+            if (c2 == null) {
+                throw new RuntimeException("connection not found for " + addr);
+            }
+        }
+        System.out.println("Active CacheCleaners: " + getActiveCleaners());
+        if (getActiveCleaners() != 1) {
+            throw new RuntimeException("Wrong number of CacheCleaner active: "
+                    + getActiveCleaners());
+        }
+        System.out.println("Removing last connection from pool");
+        for (int i=9; i<10; i++) {
+            InetSocketAddress addr = InetSocketAddress.createUnresolved("foo"+i, 80);
+            HttpConnection c2 = pool.getConnection(true, addr, proxy);
+            if (c2 == null) {
+                throw new RuntimeException("connection not found for " + addr);
+            }
+        }
+        System.out.println("Active CacheCleaners: " + getActiveCleaners()
+                + " (may be 0 or may still be 1)");
+        if (getActiveCleaners() > 1) {
+            throw new RuntimeException("Too many CacheCleaner active: "
+                    + getActiveCleaners());
+        }
+        InetSocketAddress addr = InetSocketAddress.createUnresolved("foo", 80);
+        HttpConnection c = new HttpConnectionStub(client, addr, proxy, true);
+        System.out.println("Adding/Removing one connection from pool 20 times in a loop");
+        for (int i=0; i<20; i++) {
+            pool.returnToPool(c);
+            HttpConnection c2 = pool.getConnection(true, addr, proxy);
+            if (c2 == null) {
+                throw new RuntimeException("connection not found for " + addr);
+            }
+            if (c2 != c) {
+                throw new RuntimeException("wrong connection found for " + addr);
+            }
+        }
+        if (getActiveCleaners() > 1) {
+            throw new RuntimeException("Too many CacheCleaner active: "
+                    + getActiveCleaners());
+        }
+        ReferenceQueue<HttpClient> queue = new ReferenceQueue<>();
+        WeakReference<HttpClient> weak = new WeakReference<>(client, queue);
+        System.gc();
+        Reference.reachabilityFence(pool);
+        client = null; pool = null; c = null;
+        while (true) {
+            long cleaners = getActiveCleaners();
+            System.out.println("Waiting for GC to release stub HttpClient;"
+                    + " active cache cleaners: " + cleaners);
+            System.gc();
+            Reference<?> ref = queue.remove(1000);
+            if (ref == weak) {
+                System.out.println("Stub HttpClient GC'ed");
+                break;
+            }
+        }
+        while (getActiveCleaners() > 0) {
+            System.out.println("Waiting for CacheCleaner to stop");
+            Thread.sleep(1000);
+        }
+        System.out.println("Active CacheCleaners: "
+                + getActiveCleaners());
+
+        if (getActiveCleaners() > 0) {
+            throw new RuntimeException("Too many CacheCleaner active: "
+                    + getActiveCleaners());
+        }
+    }
+    static <T> T error() {
+        throw new InternalError("Should not reach here: wrong test assumptions!");
+    }
+
+    // Emulates an HttpConnection that has a strong reference to its HttpClient.
+    static class HttpConnectionStub extends HttpConnection {
+
+        public HttpConnectionStub(HttpClient client,
+                InetSocketAddress address,
+                InetSocketAddress proxy,
+                boolean secured) {
+            super(address, null);
+            this.key = ConnectionPool.cacheKey(address, proxy);
+            this.address = address;
+            this.proxy = proxy;
+            this.secured = secured;
+            this.client = client;
+        }
+
+        InetSocketAddress proxy;
+        InetSocketAddress address;
+        boolean secured;
+        ConnectionPool.CacheKey key;
+        HttpClient client;
+
+        // All these return something
+        @Override boolean connected() {return true;}
+        @Override boolean isSecure() {return secured;}
+        @Override boolean isProxied() {return proxy!=null;}
+        @Override ConnectionPool.CacheKey cacheKey() {return key;}
+        @Override public void close() {}
+        @Override void shutdownInput() throws IOException {}
+        @Override void shutdownOutput() throws IOException {}
+        public String toString() {
+            return "HttpConnectionStub: " + address + " proxy: " + proxy;
+        }
+
+        // All these throw errors
+        @Override
+        public void connect() throws IOException, InterruptedException {error();}
+        @Override public CompletableFuture<Void> connectAsync() {return error();}
+        @Override SocketChannel channel() {return error();}
+        @Override void flushAsync() throws IOException {error();}
+        @Override
+        protected ByteBuffer readImpl() throws IOException {return error();}
+        @Override CompletableFuture<Void> whenReceivingResponse() {return error();}
+        @Override
+        long write(ByteBuffer[] buffers, int start, int number) throws IOException {
+            throw (Error)error();
+        }
+        @Override
+        long write(ByteBuffer buffer) throws IOException {throw (Error)error();}
+        @Override
+        void writeAsync(ByteBufferReference[] buffers) throws IOException {
+            error();
+        }
+        @Override
+        void writeAsyncUnordered(ByteBufferReference[] buffers)
+                throws IOException {
+            error();
+        }
+    }
+    // Emulates an HttpClient that has a strong reference to its connection pool.
+    static class HttpClientStub extends HttpClient {
+        public HttpClientStub(ConnectionPool pool) {
+            this.pool = pool;
+        }
+        final ConnectionPool pool;
+        @Override public Optional<CookieManager> cookieManager() {return error();}
+        @Override public HttpClient.Redirect followRedirects() {return error();}
+        @Override public Optional<ProxySelector> proxy() {return error();}
+        @Override public SSLContext sslContext() {return error();}
+        @Override public Optional<SSLParameters> sslParameters() {return error();}
+        @Override public Optional<Authenticator> authenticator() {return error();}
+        @Override public HttpClient.Version version() {return HttpClient.Version.HTTP_1_1;}
+        @Override public Executor executor() {return error();}
+        @Override
+        public <T> HttpResponse<T> send(HttpRequest req,
+                HttpResponse.BodyHandler<T> responseBodyHandler)
+                throws IOException, InterruptedException {
+            return error();
+        }
+        @Override
+        public <T> CompletableFuture<HttpResponse<T>> sendAsync(HttpRequest req,
+                HttpResponse.BodyHandler<T> responseBodyHandler) {
+            return error();
+        }
+        @Override
+        public <U, T> CompletableFuture<U> sendAsync(HttpRequest req,
+                HttpResponse.MultiProcessor<U, T> multiProcessor) {
+            return error();
+        }
+    }
+
+}