changeset 714:49e645ae6ee9

Updates to simple/prototype AsynchronousDatagramChannel implementation - attempting to join IPv4 multicast group with IPv6 enable fails with spurious IAE - read/receive timeout not working
author alanb
date Tue, 14 Oct 2008 13:20:25 +0100
parents e3022f78da3b
children 889050324044
files src/share/classes/java/nio/channels/AsynchronousDatagramChannel.java src/share/classes/sun/nio/ch/DatagramChannelImpl.java src/share/classes/sun/nio/ch/PendingFuture.java src/share/classes/sun/nio/ch/SimpleAsynchronousDatagramChannelImpl.java src/solaris/classes/sun/nio/ch/LinuxAsynchronousChannelProvider.java src/solaris/classes/sun/nio/ch/SolarisAsynchronousChannelProvider.java src/windows/classes/sun/nio/ch/WindowsAsynchronousChannelProvider.java test/java/nio/channels/AsynchronousDatagramChannel/Basic.java
diffstat 8 files changed, 101 insertions(+), 36 deletions(-) [+]
line wrap: on
line diff
--- a/src/share/classes/java/nio/channels/AsynchronousDatagramChannel.java	Tue Oct 14 10:33:31 2008 +0100
+++ b/src/share/classes/java/nio/channels/AsynchronousDatagramChannel.java	Tue Oct 14 13:20:25 2008 +0100
@@ -279,16 +279,6 @@
      * @throws  ClosedChannelException
      *          If this channel is closed
      *
-     * @throws  AsynchronousCloseException
-     *          If another thread closes this channel
-     *          while the connect operation is in progress
-     *
-     * @throws  ClosedByInterruptException
-     *          If another thread interrupts the current thread
-     *          while the connect operation is in progress, thereby
-     *          closing the channel and setting the current thread's
-     *          interrupt status
-     *
      * @throws  SecurityException
      *          If a security manager has been installed
      *          and it does not permit access to the given remote address
--- a/src/share/classes/sun/nio/ch/DatagramChannelImpl.java	Tue Oct 14 10:33:31 2008 +0100
+++ b/src/share/classes/sun/nio/ch/DatagramChannelImpl.java	Tue Oct 14 13:20:25 2008 +0100
@@ -111,8 +111,12 @@
     public DatagramChannelImpl(SelectorProvider sp, ProtocolFamily family) {
         super(sp);
         if ((family != StandardProtocolFamily.INET) &&
-            (family != StandardProtocolFamily.INET6)) {
-            throw new UnsupportedOperationException("Protocol family not supported");
+            (family != StandardProtocolFamily.INET6))
+        {
+            if (family == null)
+                throw new NullPointerException("'family' is null");
+            else
+                throw new UnsupportedOperationException("Protocol family not supported");
         }
         if (family == StandardProtocolFamily.INET6) {
             if (!Net.isIPv6Available()) {
--- a/src/share/classes/sun/nio/ch/PendingFuture.java	Tue Oct 14 10:33:31 2008 +0100
+++ b/src/share/classes/sun/nio/ch/PendingFuture.java	Tue Oct 14 13:20:25 2008 +0100
@@ -109,34 +109,36 @@
     /**
      * Sets the result, or a no-op if the result or exception is already set.
      */
-    void setResult(V res) {
+    boolean setResult(V res) {
         synchronized (this) {
             if (haveResult)
-                return;
+                return false;
             result = res;
             haveResult = true;
             if (timeoutTask != null)
                 timeoutTask.cancel(false);
             if (latch != null)
                 latch.countDown();
+            return true;
         }
     }
 
     /**
      * Sets the result, or a no-op if the result or exception is already set.
      */
-    void setFailure(Throwable x) {
+    boolean setFailure(Throwable x) {
         if (!(x instanceof IOException) && !(x instanceof SecurityException))
             x = new IOException(x);
         synchronized (this) {
             if (haveResult)
-                return;
+                return false;
             exc = x;
             haveResult = true;
             if (timeoutTask != null)
                 timeoutTask.cancel(false);
             if (latch != null)
                 latch.countDown();
+            return true;
         }
     }
 
--- a/src/share/classes/sun/nio/ch/SimpleAsynchronousDatagramChannelImpl.java	Tue Oct 14 10:33:31 2008 +0100
+++ b/src/share/classes/sun/nio/ch/SimpleAsynchronousDatagramChannelImpl.java	Tue Oct 14 13:20:25 2008 +0100
@@ -47,12 +47,14 @@
     private final AsynchronousChannelGroupImpl group;
     private final DatagramChannel dc;
 
-    SimpleAsynchronousDatagramChannelImpl(AsynchronousChannelGroupImpl group)
+    SimpleAsynchronousDatagramChannelImpl(ProtocolFamily family,
+                                          AsynchronousChannelGroupImpl group)
         throws IOException
     {
         super(group.provider());
         this.group = group;
-        this.dc = DatagramChannel.open();
+        this.dc = (family == null) ?
+            DatagramChannel.open() : DatagramChannel.open(family);
     }
 
     @Override
@@ -175,6 +177,7 @@
 
         CompletedFuture<Integer,A> result;
         try {
+            // assume it will not block
             int n = dc.send(src, target);
             result = CompletedFuture.withResult(this, n, attachment);
         } catch (IOException ioe) {
@@ -198,6 +201,7 @@
 
         CompletedFuture<Integer,A> result;
         try {
+            // assume it will not block
             int n = dc.write(src);
             result = CompletedFuture.withResult(this, n, attachment);
         } catch (IOException ioe) {
@@ -207,6 +211,25 @@
         return result;
     }
 
+    @SuppressWarnings("unchecked")
+    private <V,A> Future<?> scheduleTimeout(final PendingFuture<V,? super A> result,
+                                            long timeout, TimeUnit unit)
+    {
+        if (timeout > 0L) {
+            Runnable readTimeoutTask = new Runnable() {
+                public void run() {
+                    if (result.setFailure(new InterruptedByTimeoutException()))
+                        Invoker.invoke(result.handler(), (AbstractFuture<V,A>)result);
+                }
+            };
+            Future<?> timeoutTask = group.schedule(readTimeoutTask, timeout, unit);
+            result.setTimeoutTask(timeoutTask);
+            return timeoutTask;
+        } else {
+            return null;
+        }
+    }
+
     @Override
     public <A> Future<SocketAddress> receive(final ByteBuffer dst,
                                              long timeout,
@@ -235,6 +258,7 @@
             new PendingFuture<SocketAddress,A>(this, handler, attachment);
         Runnable task = new Runnable() {
             public void run() {
+                boolean completedByMe = false;
                 try {
                     SocketAddress remote;
                     if (acc == null) {
@@ -254,18 +278,22 @@
                             throw (IOException)cause;
                         }
                     }
-                    result.setResult(remote);
+                    completedByMe = result.setResult(remote);
                 } catch (Throwable x) {
                     if (x instanceof ClosedChannelException)
                         x = new AsynchronousCloseException();
-                    result.setFailure(x);
+                    completedByMe = result.setFailure(x);
                 }
-                Invoker.invokeDirect(handler, result);
+                if (completedByMe)
+                    Invoker.invokeDirect(handler, result);
             }
         };
+        Future<?> timeoutTask = scheduleTimeout(result, timeout, unit);
         try {
             group.execute(task);
         } catch (RejectedExecutionException ree) {
+            if (timeoutTask != null)
+                timeoutTask.cancel(false);
             throw new ShutdownChannelGroupException();
         }
         return result;
@@ -300,20 +328,25 @@
             new PendingFuture<Integer,A>(this, handler, attachment);
         Runnable task = new Runnable() {
             public void run() {
+                boolean completedByMe = false;
                 try {
                     int n = dc.read(dst);
-                    result.setResult(n);
+                    completedByMe = result.setResult(n);
                 } catch (Throwable x) {
                     if (x instanceof ClosedChannelException)
                         x = new AsynchronousCloseException();
-                    result.setFailure(x);
+                    completedByMe = result.setFailure(x);
                 }
-                Invoker.invokeDirect(handler, result);
+                if (completedByMe)
+                    Invoker.invokeDirect(handler, result);
             }
         };
+        Future<?> timeoutTask = scheduleTimeout(result, timeout, unit);
         try {
             group.execute(task);
         } catch (RejectedExecutionException ree) {
+            if (timeoutTask != null)
+                timeoutTask.cancel(false);
             throw new ShutdownChannelGroupException();
         }
         return result;
--- a/src/solaris/classes/sun/nio/ch/LinuxAsynchronousChannelProvider.java	Tue Oct 14 10:33:31 2008 +0100
+++ b/src/solaris/classes/sun/nio/ch/LinuxAsynchronousChannelProvider.java	Tue Oct 14 13:20:25 2008 +0100
@@ -89,6 +89,6 @@
                                                                        AsynchronousChannelGroup group)
         throws IOException
     {
-        return new SimpleAsynchronousDatagramChannelImpl(toPort(group));
+        return new SimpleAsynchronousDatagramChannelImpl(family, toPort(group));
     }
 }
--- a/src/solaris/classes/sun/nio/ch/SolarisAsynchronousChannelProvider.java	Tue Oct 14 10:33:31 2008 +0100
+++ b/src/solaris/classes/sun/nio/ch/SolarisAsynchronousChannelProvider.java	Tue Oct 14 13:20:25 2008 +0100
@@ -92,6 +92,6 @@
                                                                        AsynchronousChannelGroup group)
         throws IOException
     {
-        return new SimpleAsynchronousDatagramChannelImpl(toEventPort(group));
+        return new SimpleAsynchronousDatagramChannelImpl(family, toEventPort(group));
     }
 }
--- a/src/windows/classes/sun/nio/ch/WindowsAsynchronousChannelProvider.java	Tue Oct 14 10:33:31 2008 +0100
+++ b/src/windows/classes/sun/nio/ch/WindowsAsynchronousChannelProvider.java	Tue Oct 14 13:20:25 2008 +0100
@@ -91,6 +91,6 @@
                                                                        AsynchronousChannelGroup group)
         throws IOException
     {
-        return new SimpleAsynchronousDatagramChannelImpl(toIocp(group));
+        return new SimpleAsynchronousDatagramChannelImpl(family, toIocp(group));
     }
 }
--- a/test/java/nio/channels/AsynchronousDatagramChannel/Basic.java	Tue Oct 14 10:33:31 2008 +0100
+++ b/test/java/nio/channels/AsynchronousDatagramChannel/Basic.java	Tue Oct 14 13:20:25 2008 +0100
@@ -64,10 +64,10 @@
 
         // Test: datagram packet not received immediately
         dst.clear();
-        final CountDownLatch l1 = new CountDownLatch(1);
+        final CountDownLatch latch = new CountDownLatch(1);
         ch.receive(dst, null, new CompletionHandler<SocketAddress,Void>() {
             public void completed(SocketAddress source, Void att) {
-                l1.countDown();
+                latch.countDown();
             }
             public void failed (Throwable exc, Void att) {
             }
@@ -76,11 +76,30 @@
         });
         Thread.sleep(2000);
         sender.send(ByteBuffer.wrap(msg), sa);
-        l1.await(2, TimeUnit.SECONDS);
+        latch.await(2, TimeUnit.SECONDS);  // wait for completion handler
+
+        // Test: timeout
+        dst.clear();
+        final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
+        ch.receive(dst, 2, TimeUnit.SECONDS, null, new CompletionHandler<SocketAddress,Void>() {
+            public void completed(SocketAddress source, Void att) {
+            }
+            public void failed (Throwable exc, Void att) {
+                exception.set(exc);
+            }
+            public void cancelled(Void att) {
+            }
+        });
+        Throwable result;
+        while ((result = exception.get()) == null) {
+            Thread.sleep(100);
+        }
+        if (!(result instanceof InterruptedByTimeoutException))
+            throw new RuntimeException("InterruptedByTimeoutException expected");
 
         // AsynchronousCloseException
-        dst.clear();
-        final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
+        dst = ByteBuffer.allocateDirect(100);
+        exception.set(null);
         ch.receive(dst, null, new CompletionHandler<SocketAddress,Void>() {
             public void completed(SocketAddress source, Void att) {
             }
@@ -91,7 +110,6 @@
             }
         });
         ch.close();
-        Throwable result;
         while ((result = exception.get()) == null) {
             Thread.sleep(100);
         }
@@ -150,11 +168,30 @@
         sender.send(ByteBuffer.wrap(msg), sa);
         l1.await(2, TimeUnit.SECONDS);
 
+        // Test: timeout
+        dst.clear();
+        final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
+        ch.read(dst, 2, TimeUnit.SECONDS, null, new CompletionHandler<Integer,Void>() {
+            public void completed(Integer bytesRead, Void att) {
+            }
+            public void failed (Throwable exc, Void att) {
+                exception.set(exc);
+            }
+            public void cancelled(Void att) {
+            }
+        });
+        Throwable result;
+        while ((result = exception.get()) == null) {
+            Thread.sleep(100);
+        }
+        if (!(result instanceof InterruptedByTimeoutException))
+            throw new RuntimeException("InterruptedByTimeoutException expected");
+
         // AsynchronousCloseException
         dst.clear();
-        final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
+        exception.set(null);
         ch.read(dst, null, new CompletionHandler<Integer,Void>() {
-            public void completed(Integer source, Void att) {
+            public void completed(Integer bytesRead, Void att) {
             }
             public void failed (Throwable exc, Void att) {
                 exception.set(exc);
@@ -163,7 +200,6 @@
             }
         });
         ch.close();
-        Throwable result;
         while ((result = exception.get()) == null) {
             Thread.sleep(100);
         }