diff options
Diffstat (limited to 'qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java')
-rw-r--r-- | qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java | 89 |
1 files changed, 54 insertions, 35 deletions
diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java index 375a326654..375d9c37b4 100644 --- a/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java +++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java @@ -20,22 +20,31 @@ */ package org.apache.qpid.transport; -import org.apache.mina.util.AvailablePortFinder; - -import org.apache.qpid.test.utils.QpidTestCase; -import org.apache.qpid.transport.network.ConnectionBinding; -import org.apache.qpid.transport.network.io.IoAcceptor; -import org.apache.qpid.transport.util.Logger; -import org.apache.qpid.transport.util.Waiter; +import static org.apache.qpid.transport.Option.*; +import java.io.IOException; +import java.net.Inet4Address; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; import java.util.ArrayList; -import java.util.List; import java.util.Collections; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.io.IOException; -import static org.apache.qpid.transport.Option.*; +import org.apache.mina.util.AvailablePortFinder; +import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.transport.Connection.State; +import org.apache.qpid.transport.network.Assembler; +import org.apache.qpid.transport.network.Disassembler; +import org.apache.qpid.transport.network.InputHandler; +import org.apache.qpid.transport.network.io.IoNetworkHandler; +import org.apache.qpid.transport.network.io.IoSender; +import org.apache.qpid.transport.util.Waiter; + +import sun.security.action.GetBooleanAction; /** * ConnectionTest @@ -43,34 +52,20 @@ import static org.apache.qpid.transport.Option.*; public class ConnectionTest extends QpidTestCase implements SessionListener { - - private static final Logger log = Logger.get(ConnectionTest.class); - + private static final String TEST_TRANSPORT = "org.apache.qpid.transport.TestNetworkTransport"; + public static final int MAX_FRAME_SIZE = 64 * 1024 - 1; + private int port; private volatile boolean queue = false; private List<MessageTransfer> messages = new ArrayList<MessageTransfer>(); private List<MessageTransfer> incoming = new ArrayList<MessageTransfer>(); - private IoAcceptor _ioa = null; - - protected void setUp() throws Exception { - super.setUp(); - + setSystemProperty("qpid.transport", TEST_TRANSPORT); port = AvailablePortFinder.getNextAvailable(12000); } - protected void tearDown() throws Exception - { - if (_ioa != null) - { - _ioa.close(); - } - - super.tearDown(); - } - public void opened(Session ssn) {} public void resumed(Session ssn) {} @@ -88,6 +83,7 @@ public class ConnectionTest extends QpidTestCase implements SessionListener if (body.startsWith("CLOSE")) { + ssn.processed(xfr); ssn.getConnection().close(); } else if (body.startsWith("DELAYED_CLOSE")) @@ -174,6 +170,7 @@ public class ConnectionTest extends QpidTestCase implements SessionListener } } }); + conn.setState(State.OPEN); conn.connect("localhost", port, null, "guest", "guest", false); return conn; } @@ -211,7 +208,7 @@ public class ConnectionTest extends QpidTestCase implements SessionListener private void startServer(final ProtocolHeader protocolHeader) { - ConnectionDelegate server = new ServerDelegate() + ConnectionDelegate delegate = new ServerDelegate() { @Override public void init(Connection conn, ProtocolHeader hdr) @@ -230,18 +227,39 @@ public class ConnectionTest extends QpidTestCase implements SessionListener return ssn; } }; - + try { - _ioa = new IoAcceptor("localhost", port, ConnectionBinding.get(server)); + final ServerSocket server = new ServerSocket(); + server.setReuseAddress(true); + server.bind(new InetSocketAddress(InetAddress.getByName("localhost"), port)); + + final Connection conn = new Connection(); + conn.setConnectionDelegate(delegate); + + Thread accept = new Thread(new Runnable() + { + public void run() + { + try + { + Socket client = server.accept(); + conn.setSender(new Disassembler(new IoSender(client, 32768, 30000), MAX_FRAME_SIZE)); + Thread receiver = new Thread(new IoNetworkHandler(client, new InputHandler(new Assembler(conn)), 32768)); + receiver.start(); + } + catch (IOException e) + { + // ignore + } + } + }); + accept.start(); } catch (IOException e) { - e.printStackTrace(); - fail("Unable to start Server for test due to:" + e.getMessage()); + // ignore } - - _ioa.start(); } public void testClosedNotificationAndWriteToClosed() throws Exception @@ -417,6 +435,7 @@ public class ConnectionTest extends QpidTestCase implements SessionListener try { send(ssn, "SINK 1"); + fail("this should have failed"); } catch (SessionException exc) { |