summaryrefslogtreecommitdiff
path: root/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java')
-rw-r--r--qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java89
1 files changed, 54 insertions, 35 deletions
diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
index 375a326654..375d9c37b4 100644
--- a/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
+++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
@@ -20,22 +20,31 @@
*/
package org.apache.qpid.transport;
-import org.apache.mina.util.AvailablePortFinder;
-
-import org.apache.qpid.test.utils.QpidTestCase;
-import org.apache.qpid.transport.network.ConnectionBinding;
-import org.apache.qpid.transport.network.io.IoAcceptor;
-import org.apache.qpid.transport.util.Logger;
-import org.apache.qpid.transport.util.Waiter;
+import static org.apache.qpid.transport.Option.*;
+import java.io.IOException;
+import java.net.Inet4Address;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
import java.util.ArrayList;
-import java.util.List;
import java.util.Collections;
+import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import java.io.IOException;
-import static org.apache.qpid.transport.Option.*;
+import org.apache.mina.util.AvailablePortFinder;
+import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.transport.Connection.State;
+import org.apache.qpid.transport.network.Assembler;
+import org.apache.qpid.transport.network.Disassembler;
+import org.apache.qpid.transport.network.InputHandler;
+import org.apache.qpid.transport.network.io.IoNetworkHandler;
+import org.apache.qpid.transport.network.io.IoSender;
+import org.apache.qpid.transport.util.Waiter;
+
+import sun.security.action.GetBooleanAction;
/**
* ConnectionTest
@@ -43,34 +52,20 @@ import static org.apache.qpid.transport.Option.*;
public class ConnectionTest extends QpidTestCase implements SessionListener
{
-
- private static final Logger log = Logger.get(ConnectionTest.class);
-
+ private static final String TEST_TRANSPORT = "org.apache.qpid.transport.TestNetworkTransport";
+ public static final int MAX_FRAME_SIZE = 64 * 1024 - 1;
+
private int port;
private volatile boolean queue = false;
private List<MessageTransfer> messages = new ArrayList<MessageTransfer>();
private List<MessageTransfer> incoming = new ArrayList<MessageTransfer>();
- private IoAcceptor _ioa = null;
-
-
protected void setUp() throws Exception
{
- super.setUp();
-
+ setSystemProperty("qpid.transport", TEST_TRANSPORT);
port = AvailablePortFinder.getNextAvailable(12000);
}
- protected void tearDown() throws Exception
- {
- if (_ioa != null)
- {
- _ioa.close();
- }
-
- super.tearDown();
- }
-
public void opened(Session ssn) {}
public void resumed(Session ssn) {}
@@ -88,6 +83,7 @@ public class ConnectionTest extends QpidTestCase implements SessionListener
if (body.startsWith("CLOSE"))
{
+ ssn.processed(xfr);
ssn.getConnection().close();
}
else if (body.startsWith("DELAYED_CLOSE"))
@@ -174,6 +170,7 @@ public class ConnectionTest extends QpidTestCase implements SessionListener
}
}
});
+ conn.setState(State.OPEN);
conn.connect("localhost", port, null, "guest", "guest", false);
return conn;
}
@@ -211,7 +208,7 @@ public class ConnectionTest extends QpidTestCase implements SessionListener
private void startServer(final ProtocolHeader protocolHeader)
{
- ConnectionDelegate server = new ServerDelegate()
+ ConnectionDelegate delegate = new ServerDelegate()
{
@Override
public void init(Connection conn, ProtocolHeader hdr)
@@ -230,18 +227,39 @@ public class ConnectionTest extends QpidTestCase implements SessionListener
return ssn;
}
};
-
+
try
{
- _ioa = new IoAcceptor("localhost", port, ConnectionBinding.get(server));
+ final ServerSocket server = new ServerSocket();
+ server.setReuseAddress(true);
+ server.bind(new InetSocketAddress(InetAddress.getByName("localhost"), port));
+
+ final Connection conn = new Connection();
+ conn.setConnectionDelegate(delegate);
+
+ Thread accept = new Thread(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ Socket client = server.accept();
+ conn.setSender(new Disassembler(new IoSender(client, 32768, 30000), MAX_FRAME_SIZE));
+ Thread receiver = new Thread(new IoNetworkHandler(client, new InputHandler(new Assembler(conn)), 32768));
+ receiver.start();
+ }
+ catch (IOException e)
+ {
+ // ignore
+ }
+ }
+ });
+ accept.start();
}
catch (IOException e)
{
- e.printStackTrace();
- fail("Unable to start Server for test due to:" + e.getMessage());
+ // ignore
}
-
- _ioa.start();
}
public void testClosedNotificationAndWriteToClosed() throws Exception
@@ -417,6 +435,7 @@ public class ConnectionTest extends QpidTestCase implements SessionListener
try
{
send(ssn, "SINK 1");
+ fail("this should have failed");
}
catch (SessionException exc)
{