summaryrefslogtreecommitdiff
path: root/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java')
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java219
1 files changed, 46 insertions, 173 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java
index 481b144caf..e79fe69199 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java
@@ -50,9 +50,9 @@ import org.slf4j.LoggerFactory;
public class AMQConnectionTest extends QpidBrokerTestCase
{
- private static AMQConnection _connection;
- private static AMQTopic _topic;
- private static AMQQueue _queue;
+ protected static AMQConnection _connection;
+ protected static AMQTopic _topic;
+ protected static AMQQueue _queue;
private static QueueSession _queueSession;
private static TopicSession _topicSession;
protected static final Logger _logger = LoggerFactory.getLogger(AMQConnectionTest.class);
@@ -60,15 +60,14 @@ public class AMQConnectionTest extends QpidBrokerTestCase
protected void setUp() throws Exception
{
super.setUp();
- _connection = (AMQConnection) getConnection("guest", "guest");
+ createConnection();
_topic = new AMQTopic(_connection.getDefaultTopicExchangeName(), new AMQShortString("mytopic"));
_queue = new AMQQueue(_connection.getDefaultQueueExchangeName(), new AMQShortString("myqueue"));
}
-
- protected void tearDown() throws Exception
+
+ protected void createConnection() throws Exception
{
- _connection.close();
- super.tearDown();
+ _connection = (AMQConnection) getConnection("guest", "guest");
}
/**
@@ -207,61 +206,50 @@ public class AMQConnectionTest extends QpidBrokerTestCase
public void testPrefetchSystemProperty() throws Exception
{
- String oldPrefetch = System.getProperty(ClientProperties.MAX_PREFETCH_PROP_NAME);
- try
- {
- _connection.close();
- System.setProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(2).toString());
- _connection = (AMQConnection) getConnection();
- _connection.start();
- // Create two consumers on different sessions
- Session consSessA = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumerA = consSessA.createConsumer(_queue);
-
- Session producerSession = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = producerSession.createProducer(_queue);
+ _connection.close();
+ setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(2).toString());
+
+ createConnection();
+ _connection.start();
+ // Create two consumers on different sessions
+ Session consSessA = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumerA = consSessA.createConsumer(_queue);
- // Send 3 messages
- for (int i = 0; i < 3; i++)
- {
- producer.send(producerSession.createTextMessage("test"));
- }
-
- MessageConsumer consumerB = null;
- // 0-8, 0-9, 0-9-1 prefetch is per session, not consumer.
- if (!isBroker010())
- {
- Session consSessB = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
- consumerB = consSessB.createConsumer(_queue);
- }
- else
- {
- consumerB = consSessA.createConsumer(_queue);
- }
+ Session producerSession = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = producerSession.createProducer(_queue);
- Message msg;
- // Check that consumer A has 2 messages
- for (int i = 0; i < 2; i++)
- {
- msg = consumerA.receive(1500);
- assertNotNull("Consumer A should receive 2 messages",msg);
- }
-
- msg = consumerA.receive(1500);
- assertNull("Consumer A should not have received a 3rd message",msg);
-
- // Check that consumer B has the last message
- msg = consumerB.receive(1500);
- assertNotNull("Consumer B should have received the message",msg);
+ // Send 3 messages
+ for (int i = 0; i < 3; i++)
+ {
+ producer.send(producerSession.createTextMessage("test"));
}
- finally
+
+ MessageConsumer consumerB = null;
+ // 0-8, 0-9, 0-9-1 prefetch is per session, not consumer.
+ if (!isBroker010())
{
- if (oldPrefetch == null)
- {
- oldPrefetch = ClientProperties.MAX_PREFETCH_DEFAULT;
- }
- System.setProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, oldPrefetch);
+ Session consSessB = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ consumerB = consSessB.createConsumer(_queue);
+ }
+ else
+ {
+ consumerB = consSessA.createConsumer(_queue);
+ }
+
+ Message msg;
+ // Check that consumer A has 2 messages
+ for (int i = 0; i < 2; i++)
+ {
+ msg = consumerA.receive(1500);
+ assertNotNull("Consumer A should receive 2 messages",msg);
}
+
+ msg = consumerA.receive(1500);
+ assertNull("Consumer A should not have received a 3rd message",msg);
+
+ // Check that consumer B has the last message
+ msg = consumerB.receive(1500);
+ assertNotNull("Consumer B should have received the message",msg);
}
public void testGetChannelID() throws Exception
@@ -284,120 +272,5 @@ public class AMQConnectionTest extends QpidBrokerTestCase
}
}
}
-
- /**
- * Test Strategy : Kill -STOP the broker and see
- * if the client terminates the connection with a
- * read timeout.
- * The broker process is cleaned up in the test itself
- * and avoids using process.waitFor() as it hangs.
- */
- public void testHeartBeat() throws Exception
- {
- boolean windows =
- ((String) System.getProperties().get("os.name")).matches("(?i).*windows.*");
-
- if (!isCppBroker() || windows)
- {
- return;
- }
-
- Process process = null;
- int port = getPort(0);
- String pid = null;
- try
- {
- // close the connection and shutdown the broker started by QpidTest
- _connection.close();
- stopBroker(port);
-
- System.setProperty("qpid.heartbeat", "1");
-
- // in case this broker gets stuck, atleast the rest of the tests will not fail.
- port = port + 200;
- String startCmd = getBrokerCommand(port);
-
- // start a broker using a script
- ProcessBuilder pb = new ProcessBuilder(System.getProperty("broker.start"));
- pb.redirectErrorStream(true);
-
- Map<String, String> env = pb.environment();
- env.put("BROKER_CMD",startCmd);
- env.put("BROKER_READY",System.getProperty(BROKER_READY));
-
- Process startScript = pb.start();
- startScript.waitFor();
- startScript.destroy();
-
- Connection con =
- new AMQConnection("amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:" + port + "'");
- final AtomicBoolean lock = new AtomicBoolean(false);
-
- String cmd = "/usr/bin/pgrep -f " + port;
- process = Runtime.getRuntime().exec("/bin/bash");
- LineNumberReader reader = new LineNumberReader(new InputStreamReader(process.getInputStream()));
- PrintWriter out = new PrintWriter(new BufferedWriter(new OutputStreamWriter(process.getOutputStream())), true);
- out.println(cmd);
- pid = reader.readLine();
- try
- {
- Integer.parseInt(pid);
- }
- catch (NumberFormatException e)
- {
- // Error! try to read further to gather the error msg.
- String line;
- _logger.debug(pid);
- while ((line = reader.readLine()) != null )
- {
- _logger.debug(line);
- }
- throw new Exception( "Unable to get the brokers pid " + pid);
- }
- _logger.debug("pid : " + pid);
-
- con.setExceptionListener(new ExceptionListener(){
-
- public void onException(JMSException e)
- {
- synchronized(lock) {
- lock.set(true);
- lock.notifyAll();
- }
- }
- });
- out.println("kill -STOP " + pid);
-
- synchronized(lock){
- lock.wait(2500);
- }
- out.close();
- reader.close();
- assertTrue("Client did not terminate the connection, check log for details",lock.get());
- }
- catch(Exception e)
- {
- throw e;
- }
- finally
- {
- System.setProperty("qpid.heartbeat", "");
-
- if (process != null)
- {
- process.destroy();
- }
-
- Process killScript = Runtime.getRuntime().exec(System.getProperty("broker.kill") + " " + pid);
- killScript.waitFor();
- killScript.destroy();
- cleanBroker();
- }
- }
-
- public static junit.framework.Test suite()
- {
- return new junit.framework.TestSuite(AMQConnectionTest.class);
- }
}