changeset 699:44edf677e97e

Added prototype AsynchronousDatagramChannel to allow for test development Minor bug fixes: - ensure that read/write flag is reset when non IOException thrown - ensure that threads in cached thread pool are recognized as pooled threads
author alanb
date Tue, 07 Oct 2008 18:38:36 +0100
parents cbf58d8a3fec
children 693805fade68
files make/java/nio/FILES_java.gmk src/share/classes/sun/nio/ch/AsynchronousChannelGroupImpl.java src/share/classes/sun/nio/ch/SimpleAsynchronousDatagramChannelImpl.java src/solaris/classes/sun/nio/ch/LinuxAsynchronousChannelProvider.java src/solaris/classes/sun/nio/ch/SolarisAsynchronousChannelProvider.java src/solaris/classes/sun/nio/ch/UnixAsynchronousServerSocketChannelImpl.java src/solaris/classes/sun/nio/ch/UnixAsynchronousSocketChannelImpl.java src/windows/classes/sun/nio/ch/WindowsAsynchronousChannelProvider.java test/java/nio/channels/AsynchronousDatagramChannel/Basic.java
diffstat 9 files changed, 745 insertions(+), 26 deletions(-) [+]
line wrap: on
line diff
--- a/make/java/nio/FILES_java.gmk	Tue Oct 07 18:00:58 2008 +0100
+++ b/make/java/nio/FILES_java.gmk	Tue Oct 07 18:38:36 2008 +0100
@@ -56,7 +56,6 @@
 	java/nio/channels/Selector.java \
 	java/nio/channels/SelectionKey.java \
 	java/nio/channels/ServerSocketChannel.java \
-	java/nio/channels/ShutdownChannelGroupException.java \
 	java/nio/channels/SocketChannel.java \
 	java/nio/channels/WritableByteChannel.java \
 	\
@@ -204,6 +203,7 @@
 	sun/nio/ch/SelChImpl.java \
 	sun/nio/ch/ServerSocketAdaptor.java \
 	sun/nio/ch/ServerSocketChannelImpl.java \
+	sun/nio/ch/SimpleAsynchronousDatagramChannelImpl.java \
         sun/nio/ch/SinkChannelImpl.java \
 	sun/nio/ch/SocketAdaptor.java \
 	sun/nio/ch/SocketChannelImpl.java \
@@ -386,6 +386,7 @@
 	java/nio/channels/NotYetConnectedException.java \
 	java/nio/channels/OverlappingFileLockException.java \
 	java/nio/channels/ReadPendingException.java \
+	java/nio/channels/ShutdownChannelGroupException.java \
 	java/nio/channels/UnresolvedAddressException.java \
 	java/nio/channels/UnsupportedAddressTypeException.java \
 	java/nio/channels/WritePendingException.java \
--- a/src/share/classes/sun/nio/ch/AsynchronousChannelGroupImpl.java	Tue Oct 07 18:00:58 2008 +0100
+++ b/src/share/classes/sun/nio/ch/AsynchronousChannelGroupImpl.java	Tue Oct 07 18:38:36 2008 +0100
@@ -189,7 +189,7 @@
         if (pool.isFixedThreadPool()) {
             executeOnHandlerTask(task);
         } else {
-            pool.executor().execute(task);
+            pool.executor().execute(bindToGroup(task));
         }
     }
 
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/share/classes/sun/nio/ch/SimpleAsynchronousDatagramChannelImpl.java	Tue Oct 07 18:38:36 2008 +0100
@@ -0,0 +1,369 @@
+/*
+ * Copyright 2007-2008 Sun Microsystems, Inc.  All Rights Reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Sun designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Sun in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ */
+
+package sun.nio.ch;
+
+import java.nio.ByteBuffer;
+import java.nio.channels.*;
+import java.net.*;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.security.AccessController;
+import java.security.AccessControlContext;
+import java.security.PrivilegedExceptionAction;
+import java.security.PrivilegedActionException;
+
+/**
+ * A prototype implementation of AsynchronousDatagramChannel
+ */
+
+class SimpleAsynchronousDatagramChannelImpl
+    extends AsynchronousDatagramChannel implements MulticastChannel, Groupable
+{
+    private final AsynchronousChannelGroupImpl group;
+    private final DatagramChannel dc;
+    private final AtomicBoolean reading = new AtomicBoolean();
+
+    SimpleAsynchronousDatagramChannelImpl(AsynchronousChannelGroupImpl group)
+        throws IOException
+    {
+        super(group.provider());
+        this.group = group;
+        this.dc = DatagramChannel.open();
+    }
+
+    private void enableReading() {
+        reading.set(false);
+    }
+
+    private void checkAndSetReading() {
+        if (!reading.compareAndSet(false, true))
+            throw new ReadPendingException();
+    }
+
+    @Override
+    public AsynchronousChannelGroupImpl group() {
+        return group;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return dc.isOpen();
+    }
+
+    @Override
+    public void close() throws IOException {
+        dc.close();
+    }
+
+    @Override
+    public AsynchronousDatagramChannel connect(SocketAddress remote)
+        throws IOException
+    {
+        dc.connect(remote);
+        return this;
+    }
+
+    @Override
+    public AsynchronousDatagramChannel disconnect() throws IOException {
+        dc.disconnect();
+        return this;
+    }
+
+    private static class WrappedMembershipKey extends MembershipKey {
+        private final MulticastChannel channel;
+        private final MembershipKey key;
+
+        WrappedMembershipKey(MulticastChannel channel, MembershipKey key) {
+            this.channel = channel;
+            this.key = key;
+        }
+
+        @Override
+        public boolean isValid() {
+            return key.isValid();
+        }
+
+        @Override
+        public void drop() throws IOException {
+            key.drop();
+        }
+
+        @Override
+        public MulticastChannel getChannel() {
+            return channel;
+        }
+
+        @Override
+        public InetAddress getGroup() {
+            return key.getGroup();
+        }
+
+        @Override
+        public NetworkInterface getNetworkInterface() {
+            return key.getNetworkInterface();
+        }
+
+        @Override
+        public InetAddress getSourceAddress() {
+            return key.getSourceAddress();
+        }
+
+        @Override
+        public MembershipKey block(InetAddress toBlock) throws IOException {
+            key.block(toBlock);
+            return this;
+        }
+
+        @Override
+        public MembershipKey unblock(InetAddress toUnblock) throws IOException {
+            key.unblock(toUnblock);
+            return this;
+        }
+
+        @Override
+        public String toString() {
+            return key.toString();
+        }
+    }
+
+    @Override
+    public MembershipKey join(InetAddress group,
+                              NetworkInterface interf)
+        throws IOException
+    {
+        MembershipKey key = ((MulticastChannel)dc).join(group, interf);
+        return new WrappedMembershipKey(this, key);
+    }
+
+    @Override
+    public MembershipKey join(InetAddress group,
+                              NetworkInterface interf,
+                              InetAddress source)
+        throws IOException
+    {
+        MembershipKey key = ((MulticastChannel)dc).join(group, interf, source);
+        return new WrappedMembershipKey(this, key);
+    }
+
+    @Override
+    public <A> Future<Integer> send(ByteBuffer src,
+                                    SocketAddress target,
+                                    long timeout,
+                                    TimeUnit unit,
+                                    A attachment,
+                                    CompletionHandler<Integer,? super A> handler)
+    {
+        if (timeout < 0L)
+            throw new IllegalArgumentException("Negative timeout");
+        if (unit == null)
+            throw new NullPointerException();
+
+        CompletedFuture<Integer,A> result;
+        try {
+            int n = dc.send(src, target);
+            result = CompletedFuture.withResult(this, n, attachment);
+        } catch (IOException ioe) {
+            result = CompletedFuture.withFailure(this, ioe, attachment);
+        }
+        Invoker.invoke(handler, result);
+        return result;
+    }
+
+    @Override
+    public <A> Future<Integer> write(ByteBuffer src,
+                                     long timeout,
+                                     TimeUnit unit,
+                                     A attachment,
+                                     CompletionHandler<Integer,? super A> handler)
+    {
+        if (timeout < 0L)
+            throw new IllegalArgumentException("Negative timeout");
+        if (unit == null)
+            throw new NullPointerException();
+
+        CompletedFuture<Integer,A> result;
+        try {
+            int n = dc.write(src);
+            result = CompletedFuture.withResult(this, n, attachment);
+        } catch (IOException ioe) {
+            result = CompletedFuture.withFailure(this, ioe, attachment);
+        }
+        Invoker.invoke(handler, result);
+        return result;
+    }
+
+    @Override
+    public <A> Future<SocketAddress> receive(final ByteBuffer dst,
+                                             long timeout,
+                                             TimeUnit unit,
+                                             A attachment,
+                                             final CompletionHandler<SocketAddress,? super A> handler)
+    {
+        if (dst == null)
+            throw new NullPointerException();
+        if (timeout < 0L)
+            throw new IllegalArgumentException("Negative timeout");
+        if (unit == null)
+            throw new NullPointerException();
+
+        // complete immediately if channel closed
+        if (!isOpen()) {
+            CompletedFuture<SocketAddress,A> result = CompletedFuture.withFailure(this,
+                new ClosedChannelException(), attachment);
+            Invoker.invoke(handler, result);
+            return result;
+        }
+
+        final AccessControlContext acc = (System.getSecurityManager() == null) ?
+            null : AccessController.getContext();
+        final PendingFuture<SocketAddress,A> result =
+            new PendingFuture<SocketAddress,A>(this, handler, attachment);
+        checkAndSetReading();
+        Runnable task = new Runnable() {
+            public void run() {
+                try {
+                    SocketAddress remote;
+                    if (acc == null) {
+                        remote = dc.receive(dst);
+                    } else {
+                        // receive in caller context
+                        try {
+                            remote = AccessController.doPrivileged(
+                                new PrivilegedExceptionAction<SocketAddress>() {
+                                    public SocketAddress run() throws IOException {
+                                        return dc.receive(dst);
+                                    }}, acc);
+                        } catch (PrivilegedActionException pae) {
+                            Exception cause = pae.getException();
+                            if (cause instanceof SecurityException)
+                                throw (SecurityException)cause;
+                            throw (IOException)cause;
+                        }
+                    }
+                    enableReading();
+                    result.setResult(remote);
+                } catch (Throwable x) {
+                    enableReading();
+                    result.setFailure(x);
+                }
+                Invoker.invokeDirect(handler, result);
+            }
+        };
+        try {
+            group.execute(task);
+        } catch (RejectedExecutionException ree) {
+            throw new ShutdownChannelGroupException();
+        }
+        return result;
+    }
+
+    @Override
+    public <A> Future<Integer> read(final ByteBuffer dst,
+                                    long timeout,
+                                    TimeUnit unit,
+                                    A attachment,
+                                    final CompletionHandler<Integer,? super A> handler)
+    {
+        if (dst == null)
+            throw new NullPointerException();
+        if (timeout < 0L)
+            throw new IllegalArgumentException("Negative timeout");
+        if (unit == null)
+            throw new NullPointerException();
+        // another thread may disconnect before read is initiated
+        if (!dc.isConnected())
+            throw new NotYetConnectedException();
+
+        // complete immediately if channel closed
+        if (!isOpen()) {
+            CompletedFuture<Integer,A> result = CompletedFuture.withFailure(this,
+                new ClosedChannelException(), attachment);
+            Invoker.invoke(handler, result);
+            return result;
+        }
+
+        final PendingFuture<Integer,A> result =
+            new PendingFuture<Integer,A>(this, handler, attachment);
+        checkAndSetReading();
+        Runnable task = new Runnable() {
+            public void run() {
+                try {
+                    int n = dc.read(dst);
+                    enableReading();
+                    result.setResult(n);
+                } catch (Throwable x) {
+                    enableReading();
+                    result.setFailure(x);
+                }
+                Invoker.invokeDirect(handler, result);
+            }
+        };
+        try {
+            group.execute(task);
+        } catch (RejectedExecutionException ree) {
+            throw new ShutdownChannelGroupException();
+        }
+        return result;
+    }
+
+    @Override
+    public  AsynchronousDatagramChannel bind(SocketAddress local)
+        throws IOException
+    {
+        dc.bind(local);
+        return this;
+    }
+
+    @Override
+    public SocketAddress getLocalAddress() throws IOException {
+        return dc.getLocalAddress();
+    }
+
+    @Override
+    public <T> AsynchronousDatagramChannel setOption(SocketOption<T> name, T value)
+        throws IOException
+    {
+        dc.setOption(name, value);
+        return this;
+    }
+
+    @Override
+    public  <T> T getOption(SocketOption<T> name) throws IOException {
+        return dc.getOption(name);
+    }
+
+    @Override
+    public Set<SocketOption<?>> supportedOptions() {
+        return dc.supportedOptions();
+    }
+
+    @Override
+    public SocketAddress getRemoteAddress() throws IOException {
+        return dc.getRemoteAddress();
+    }
+}
--- a/src/solaris/classes/sun/nio/ch/LinuxAsynchronousChannelProvider.java	Tue Oct 07 18:00:58 2008 +0100
+++ b/src/solaris/classes/sun/nio/ch/LinuxAsynchronousChannelProvider.java	Tue Oct 07 18:38:36 2008 +0100
@@ -89,6 +89,6 @@
                                                                        AsynchronousChannelGroup group)
         throws IOException
     {
-        throw new RuntimeException();
+        return new SimpleAsynchronousDatagramChannelImpl(toPort(group));
     }
 }
--- a/src/solaris/classes/sun/nio/ch/SolarisAsynchronousChannelProvider.java	Tue Oct 07 18:00:58 2008 +0100
+++ b/src/solaris/classes/sun/nio/ch/SolarisAsynchronousChannelProvider.java	Tue Oct 07 18:38:36 2008 +0100
@@ -92,6 +92,6 @@
                                                                        AsynchronousChannelGroup group)
         throws IOException
     {
-        throw new RuntimeException();
+        return new SimpleAsynchronousDatagramChannelImpl(toEventPort(group));
     }
 }
--- a/src/solaris/classes/sun/nio/ch/UnixAsynchronousServerSocketChannelImpl.java	Tue Oct 07 18:00:58 2008 +0100
+++ b/src/solaris/classes/sun/nio/ch/UnixAsynchronousServerSocketChannelImpl.java	Tue Oct 07 18:38:36 2008 +0100
@@ -141,7 +141,7 @@
             // connection accepted
             accepted = true;
 
-        } catch (IOException x) {
+        } catch (Throwable x) {
             if (x instanceof ClosedChannelException)
                 x = new AsynchronousCloseException();
             enableAccept();
@@ -270,7 +270,7 @@
                 port.startPoll(fdVal, Port.POLLIN);
                 return result;
             }
-        } catch (IOException x) {
+        } catch (Throwable x) {
             // accept failed
             if (x instanceof ClosedChannelException)
                 x = new AsynchronousCloseException();
@@ -284,9 +284,7 @@
             try {
                 AsynchronousSocketChannel ch = finishAccept(newfd, isaa[0], null);
                 result = CompletedFuture.withResult(this, ch, attachment);
-            } catch (IOException x) {
-                result = CompletedFuture.withFailure(this, x, attachment);
-            } catch (SecurityException x) {
+            } catch (Throwable x) {
                 result = CompletedFuture.withFailure(this, x, attachment);
             }
         }
--- a/src/solaris/classes/sun/nio/ch/UnixAsynchronousSocketChannelImpl.java	Tue Oct 07 18:00:58 2008 +0100
+++ b/src/solaris/classes/sun/nio/ch/UnixAsynchronousSocketChannelImpl.java	Tue Oct 07 18:38:36 2008 +0100
@@ -28,9 +28,7 @@
 import java.nio.channels.*;
 import java.nio.ByteBuffer;
 import java.net.*;
-import java.util.*;
 import java.util.concurrent.*;
-import java.util.concurrent.locks.*;
 import java.io.IOException;
 import java.io.FileDescriptor;
 
@@ -270,25 +268,25 @@
     private void finishConnect(PendingFuture<Void,Object> result,
                                boolean invokeDirect)
     {
-        IOException ioe = null;
+        Throwable e = null;
         try {
             begin();
             checkConnect(fdVal);
             setConnected();
             result.setResult(null);
-        } catch (IOException x) {
+        } catch (Throwable x) {
             if (x instanceof ClosedChannelException)
                 x = new AsynchronousCloseException();
-            ioe = x;
+            e = x;
         } finally {
             end();
         }
-        if (ioe != null) {
+        if (e != null) {
             // close channel if connection cannot be established
             try {
                 close();
             } catch (IOException ignore) { }
-            result.setFailure(ioe);
+            result.setFailure(e);
         }
         if (invokeDirect)
             Invoker.invoke(result.handler(), result);
@@ -327,7 +325,7 @@
         }
 
         AbstractFuture<Void,A> result = null;
-        IOException ioe = null;
+        Throwable e = null;
         try {
             begin();
             int n = Net.connect(fd, isa.getAddress(), isa.getPort());
@@ -339,20 +337,20 @@
             }
             setConnected();
             result = CompletedFuture.withResult(this, null, attachment);
-        } catch (IOException x) {
+        } catch (Throwable x) {
             if (x instanceof ClosedChannelException)
                 x = new AsynchronousCloseException();
-            ioe = x;
+            e = x;
         } finally {
             end();
         }
 
         // close channel if connect fails
-        if (ioe != null) {
+        if (e != null) {
             try {
                 close();
             } catch (IOException ignore) { }
-            result = CompletedFuture.withFailure(this, ioe, attachment);
+            result = CompletedFuture.withFailure(this, e, attachment);
         }
 
         Invoker.invoke(handler, result);
@@ -400,7 +398,7 @@
                 result.setResult(Integer.valueOf(n));
             }
 
-        } catch (IOException x) {
+        } catch (Throwable x) {
             enableReading();
             if (x instanceof ClosedChannelException)
                 x = new AsynchronousCloseException();
@@ -487,7 +485,7 @@
                 result = CompletedFuture
                     .withResult(this, Integer.valueOf(n), attachment);
             }
-        } catch (IOException x) {
+        } catch (Throwable x) {
             enableReading();
             if (x instanceof ClosedChannelException)
                 x = new AsynchronousCloseException();
@@ -537,7 +535,7 @@
                 result.setResult(Integer.valueOf(n));
             }
 
-        } catch (IOException x) {
+        } catch (Throwable x) {
             enableWriting();
             if (x instanceof ClosedChannelException)
                 x = new AsynchronousCloseException();
@@ -620,7 +618,7 @@
                 result = CompletedFuture
                     .withResult(this, Integer.valueOf(n), attachment);
             }
-        } catch (IOException x) {
+        } catch (Throwable x) {
             enableWriting();
             if (x instanceof ClosedChannelException)
                 x = new AsynchronousCloseException();
--- a/src/windows/classes/sun/nio/ch/WindowsAsynchronousChannelProvider.java	Tue Oct 07 18:00:58 2008 +0100
+++ b/src/windows/classes/sun/nio/ch/WindowsAsynchronousChannelProvider.java	Tue Oct 07 18:38:36 2008 +0100
@@ -91,6 +91,6 @@
                                                                        AsynchronousChannelGroup group)
         throws IOException
     {
-        throw new RuntimeException();
+        return new SimpleAsynchronousDatagramChannelImpl(toIocp(group));
     }
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test/java/nio/channels/AsynchronousDatagramChannel/Basic.java	Tue Oct 07 18:38:36 2008 +0100
@@ -0,0 +1,353 @@
+/*
+ * Copyright 2007-2008 Sun Microsystems, Inc.  All Rights Reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ */
+
+/* @test
+ * @bug 4527345
+ * @summary Unit test for AsynchronousDatagramChannel
+ */
+
+import java.nio.ByteBuffer;
+import java.nio.channels.*;
+import java.net.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+public class Basic {
+
+    public static void main(String[] args) throws Exception {
+        doReceiveTests();
+        doReadTests();
+        doSendTests();
+        doWriteTests();
+        doMulticastTests();
+    }
+
+    // basic receive tests
+    static void doReceiveTests() throws Exception {
+        final byte[] msg = "hello".getBytes();
+
+        AsynchronousDatagramChannel ch = AsynchronousDatagramChannel.open()
+            .bind(new InetSocketAddress(0));
+        int port = ((InetSocketAddress)(ch.getLocalAddress())).getPort();
+        InetAddress rh = InetAddress.getLocalHost();
+        final SocketAddress sa = new InetSocketAddress(rh, port);
+
+        DatagramChannel sender = DatagramChannel.open();
+        ByteBuffer dst = ByteBuffer.allocateDirect(100);
+
+        // Test: datagram packet received immediately
+        sender.send(ByteBuffer.wrap(msg), sa);
+        dst.clear();
+        ch.receive(dst).get(1, TimeUnit.SECONDS);
+        if (dst.flip().remaining() != msg.length)
+            throw new RuntimeException("Unexpected number of bytes read");
+
+        // Test: datagram packet not received immediately
+        dst.clear();
+        final CountDownLatch l1 = new CountDownLatch(1);
+        ch.receive(dst, null, new CompletionHandler<SocketAddress,Void>() {
+            public void completed(SocketAddress source, Void att) {
+                l1.countDown();
+            }
+            public void failed (Throwable exc, Void att) {
+            }
+            public void cancelled(Void att) {
+            }
+        });
+        Thread.sleep(2000);
+        sender.send(ByteBuffer.wrap(msg), sa);
+        l1.await(2, TimeUnit.SECONDS);
+
+        // ReadPendingException
+        dst.clear();
+        final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
+        ch.receive(dst, null, new CompletionHandler<SocketAddress,Void>() {
+            public void completed(SocketAddress source, Void att) {
+            }
+            public void failed (Throwable exc, Void att) {
+                exception.set(exc);
+            }
+            public void cancelled(Void att) {
+            }
+        });
+        try {
+            ch.receive(ByteBuffer.allocate(100));
+            throw new RuntimeException("ReadPendingExcepiton expected");
+        } catch (ReadPendingException e) { }
+
+        // AsynchronousCloseException
+        ch.close();
+        Throwable result;
+        while ((result = exception.get()) == null) {
+            Thread.sleep(100);
+        }
+        if (!(result instanceof AsynchronousCloseException))
+            throw new RuntimeException("AsynchronousCloseException expected");
+
+        // done
+        sender.close();
+    }
+
+    // basic read tests
+    static void doReadTests() throws Exception {
+        final byte[] msg = "hello".getBytes();
+
+        AsynchronousDatagramChannel ch = AsynchronousDatagramChannel.open()
+            .bind(new InetSocketAddress(0));
+        int port = ((InetSocketAddress)(ch.getLocalAddress())).getPort();
+        InetAddress lh = InetAddress.getLocalHost();
+        final SocketAddress sa = new InetSocketAddress(lh, port);
+
+        DatagramChannel sender = DatagramChannel.open();
+        ByteBuffer dst = ByteBuffer.allocateDirect(100);
+
+        // Test: not connected
+        try {
+            ch.read(dst);
+            throw new RuntimeException("NotYetConnectedException expected");
+        } catch (NotYetConnectedException e) {
+        }
+
+        // connect the channel
+        sender.bind(new InetSocketAddress(0));
+        ch.connect(new InetSocketAddress(lh,
+                ((InetSocketAddress)(sender.getLocalAddress())).getPort()));
+
+        // Test: datagram packet received immediately
+        sender.send(ByteBuffer.wrap(msg), sa);
+        dst.clear();
+        ch.read(dst).get(1, TimeUnit.SECONDS);
+        if (dst.flip().remaining() != msg.length)
+            throw new RuntimeException("Unexpected number of bytes read");
+
+        // Test: datagram packet not received immediately
+        dst.clear();
+        final CountDownLatch l1 = new CountDownLatch(1);
+        ch.read(dst, null, new CompletionHandler<Integer,Void>() {
+            public void completed(Integer bytesRead, Void att) {
+                l1.countDown();
+            }
+            public void failed (Throwable exc, Void att) {
+            }
+            public void cancelled(Void att) {
+            }
+        });
+        Thread.sleep(2000);
+        sender.send(ByteBuffer.wrap(msg), sa);
+        l1.await(2, TimeUnit.SECONDS);
+
+        // ReadPendingException
+        dst.clear();
+        final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
+        ch.read(dst, null, new CompletionHandler<Integer,Void>() {
+            public void completed(Integer source, Void att) {
+            }
+            public void failed (Throwable exc, Void att) {
+                exception.set(exc);
+            }
+            public void cancelled(Void att) {
+            }
+        });
+        try {
+            ch.read(ByteBuffer.allocate(100));
+            throw new RuntimeException("ReadPendingExcepiton expected");
+        } catch (ReadPendingException e) { }
+
+        // AsynchronousCloseException
+        ch.close();
+        Throwable result;
+        while ((result = exception.get()) == null) {
+            Thread.sleep(100);
+        }
+        if (!(result instanceof AsynchronousCloseException))
+            throw new RuntimeException("AsynchronousCloseException expected");
+
+        // done
+        sender.close();
+    }
+
+    // basic send tests
+    static void doSendTests() throws Exception {
+        final byte[] msg = "hello".getBytes();
+
+        DatagramChannel reader = DatagramChannel.open()
+            .bind(new InetSocketAddress(0));
+        int port = ((InetSocketAddress)(reader.getLocalAddress())).getPort();
+        InetAddress rh = InetAddress.getLocalHost();
+        SocketAddress sa = new InetSocketAddress(rh, port);
+
+        AsynchronousDatagramChannel ch = AsynchronousDatagramChannel.open();
+
+        // Test: send datagram packet to reader
+        int bytesSent = ch.send(ByteBuffer.wrap(msg), sa).get();
+        if (bytesSent != msg.length)
+            throw new RuntimeException("Unexpected number of bytes sent");
+
+        // check received
+        ByteBuffer dst = ByteBuffer.allocateDirect(100);
+        reader.receive(dst);
+        dst.flip();
+        if (dst.remaining() != msg.length)
+            throw new RuntimeException("Unexpected number of bytes received");
+
+        // Test: send datagram packet to reader and check completion handler
+        // is invoked
+        final CountDownLatch l2 = new CountDownLatch(1);
+        ch.send(ByteBuffer.wrap(msg), sa, null, new CompletionHandler<Integer,Void>() {
+            public void completed(Integer bytesSent, Void att) {
+                if (bytesSent != msg.length)
+                    throw new RuntimeException("Unexpected number of bytes received");
+                l2.countDown();
+            }
+            public void failed (Throwable exc, Void att) {
+            }
+            public void cancelled(Void att) {
+            }
+        });
+        l2.await(5, TimeUnit.SECONDS);
+
+        // check received
+        dst.clear();
+        reader.receive(dst);
+        dst.flip();
+        if (dst.remaining() != msg.length)
+            throw new RuntimeException("Unexpected number of bytes received");
+
+        // Test: check that failed method is invoked
+        ch.close();
+        final CountDownLatch l3 = new CountDownLatch(1);
+        ch.send(ByteBuffer.wrap(msg), sa, null, new CompletionHandler<Integer,Void>() {
+            public void completed(Integer bytesSent, Void att) {
+                throw new RuntimeException("completed method invoked");
+            }
+            public void failed (Throwable exc, Void att) {
+                if (exc instanceof ClosedChannelException) {
+                    l3.countDown();
+                } else {
+                    throw new RuntimeException(exc);
+                }
+            }
+            public void cancelled(Void att) {
+            }
+        });
+        l3.await(5, TimeUnit.SECONDS);
+
+        // done
+        reader.close();
+    }
+
+    // basic write tests
+    static void doWriteTests() throws Exception {
+        final byte[] msg = "hello".getBytes();
+
+        DatagramChannel reader = DatagramChannel.open()
+            .bind(new InetSocketAddress(0));
+        int port = ((InetSocketAddress)(reader.getLocalAddress())).getPort();
+        InetAddress rh = InetAddress.getLocalHost();
+        SocketAddress sa = new InetSocketAddress(rh, port);
+
+        AsynchronousDatagramChannel ch = AsynchronousDatagramChannel.open();
+
+        // Test: unconnected
+        try {
+            ch.write(ByteBuffer.wrap(msg)).get();
+            throw new RuntimeException("NotYetConnectedException expected");
+        } catch (NotYetConnectedException e) {
+        }
+
+        // Test: connect, and write datagram
+        ch.connect(sa);
+        int bytesSent = ch.write(ByteBuffer.wrap(msg)).get();
+        if (bytesSent != msg.length)
+            throw new RuntimeException("Unexpected number of bytes sent");
+
+        // check received
+        ByteBuffer dst = ByteBuffer.allocateDirect(100);
+        reader.receive(dst);
+        dst.flip();
+        if (dst.remaining() != msg.length)
+            throw new RuntimeException("Unexpected number of bytes received");
+
+        // Test: write datagram and check completion handler is invoked
+        final CountDownLatch l2 = new CountDownLatch(1);
+        ch.write(ByteBuffer.wrap(msg), null, new CompletionHandler<Integer,Void>() {
+            public void completed(Integer bytesSent, Void att) {
+                if (bytesSent != msg.length)
+                    throw new RuntimeException("Unexpected number of bytes received");
+                l2.countDown();
+            }
+            public void failed (Throwable exc, Void att) {
+            }
+            public void cancelled(Void att) {
+            }
+        });
+        l2.await(5, TimeUnit.SECONDS);
+
+        // check received
+        dst.clear();
+        reader.receive(dst);
+        dst.flip();
+        if (dst.remaining() != msg.length)
+            throw new RuntimeException("Unexpected number of bytes received");
+
+        // done
+        ch.close();
+        reader.close();
+    }
+
+    // basic multicast test
+    static void doMulticastTests() throws Exception {
+        final byte[] msg = "hello".getBytes();
+
+        AsynchronousDatagramChannel ch = AsynchronousDatagramChannel
+            .open(StandardProtocolFamily.INET, null)
+            .setOption(StandardSocketOption.SO_REUSEADDR, true)
+            .bind(new InetSocketAddress(0));
+
+        InetAddress lh = InetAddress.getLocalHost();
+        int port = ((InetSocketAddress)(ch.getLocalAddress())).getPort();
+
+        // join group
+        InetAddress group = InetAddress.getByName("225.4.5.6");
+        NetworkInterface interf = NetworkInterface.getByInetAddress(lh);
+        MembershipKey key = ((MulticastChannel)ch).join(group, interf);
+
+        // check key
+        if (key.getChannel() != ch)
+            throw new RuntimeException("Not the expected channel");
+
+        // send message to group
+        DatagramChannel sender = DatagramChannel.open();
+        sender.send(ByteBuffer.wrap(msg), new InetSocketAddress(group, port));
+        sender.close();
+
+        // check message received
+        ByteBuffer dst = ByteBuffer.allocate(200);
+        SocketAddress source = ch.receive(dst).get(2, TimeUnit.SECONDS);
+        if (!((InetSocketAddress)source).getAddress().equals(lh))
+            throw new RuntimeException("Unexpected source");
+
+        // done
+        ch.close();
+    }
+}