diff options
Diffstat (limited to 'qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java')
-rw-r--r-- | qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java | 219 |
1 files changed, 46 insertions, 173 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java index 481b144caf..e79fe69199 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java @@ -50,9 +50,9 @@ import org.slf4j.LoggerFactory; public class AMQConnectionTest extends QpidBrokerTestCase { - private static AMQConnection _connection; - private static AMQTopic _topic; - private static AMQQueue _queue; + protected static AMQConnection _connection; + protected static AMQTopic _topic; + protected static AMQQueue _queue; private static QueueSession _queueSession; private static TopicSession _topicSession; protected static final Logger _logger = LoggerFactory.getLogger(AMQConnectionTest.class); @@ -60,15 +60,14 @@ public class AMQConnectionTest extends QpidBrokerTestCase protected void setUp() throws Exception { super.setUp(); - _connection = (AMQConnection) getConnection("guest", "guest"); + createConnection(); _topic = new AMQTopic(_connection.getDefaultTopicExchangeName(), new AMQShortString("mytopic")); _queue = new AMQQueue(_connection.getDefaultQueueExchangeName(), new AMQShortString("myqueue")); } - - protected void tearDown() throws Exception + + protected void createConnection() throws Exception { - _connection.close(); - super.tearDown(); + _connection = (AMQConnection) getConnection("guest", "guest"); } /** @@ -207,61 +206,50 @@ public class AMQConnectionTest extends QpidBrokerTestCase public void testPrefetchSystemProperty() throws Exception { - String oldPrefetch = System.getProperty(ClientProperties.MAX_PREFETCH_PROP_NAME); - try - { - _connection.close(); - System.setProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(2).toString()); - _connection = (AMQConnection) getConnection(); - _connection.start(); - // Create two consumers on different sessions - Session consSessA = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumerA = consSessA.createConsumer(_queue); - - Session producerSession = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = producerSession.createProducer(_queue); + _connection.close(); + setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(2).toString()); + + createConnection(); + _connection.start(); + // Create two consumers on different sessions + Session consSessA = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumerA = consSessA.createConsumer(_queue); - // Send 3 messages - for (int i = 0; i < 3; i++) - { - producer.send(producerSession.createTextMessage("test")); - } - - MessageConsumer consumerB = null; - // 0-8, 0-9, 0-9-1 prefetch is per session, not consumer. - if (!isBroker010()) - { - Session consSessB = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE); - consumerB = consSessB.createConsumer(_queue); - } - else - { - consumerB = consSessA.createConsumer(_queue); - } + Session producerSession = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = producerSession.createProducer(_queue); - Message msg; - // Check that consumer A has 2 messages - for (int i = 0; i < 2; i++) - { - msg = consumerA.receive(1500); - assertNotNull("Consumer A should receive 2 messages",msg); - } - - msg = consumerA.receive(1500); - assertNull("Consumer A should not have received a 3rd message",msg); - - // Check that consumer B has the last message - msg = consumerB.receive(1500); - assertNotNull("Consumer B should have received the message",msg); + // Send 3 messages + for (int i = 0; i < 3; i++) + { + producer.send(producerSession.createTextMessage("test")); } - finally + + MessageConsumer consumerB = null; + // 0-8, 0-9, 0-9-1 prefetch is per session, not consumer. + if (!isBroker010()) { - if (oldPrefetch == null) - { - oldPrefetch = ClientProperties.MAX_PREFETCH_DEFAULT; - } - System.setProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, oldPrefetch); + Session consSessB = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + consumerB = consSessB.createConsumer(_queue); + } + else + { + consumerB = consSessA.createConsumer(_queue); + } + + Message msg; + // Check that consumer A has 2 messages + for (int i = 0; i < 2; i++) + { + msg = consumerA.receive(1500); + assertNotNull("Consumer A should receive 2 messages",msg); } + + msg = consumerA.receive(1500); + assertNull("Consumer A should not have received a 3rd message",msg); + + // Check that consumer B has the last message + msg = consumerB.receive(1500); + assertNotNull("Consumer B should have received the message",msg); } public void testGetChannelID() throws Exception @@ -284,120 +272,5 @@ public class AMQConnectionTest extends QpidBrokerTestCase } } } - - /** - * Test Strategy : Kill -STOP the broker and see - * if the client terminates the connection with a - * read timeout. - * The broker process is cleaned up in the test itself - * and avoids using process.waitFor() as it hangs. - */ - public void testHeartBeat() throws Exception - { - boolean windows = - ((String) System.getProperties().get("os.name")).matches("(?i).*windows.*"); - - if (!isCppBroker() || windows) - { - return; - } - - Process process = null; - int port = getPort(0); - String pid = null; - try - { - // close the connection and shutdown the broker started by QpidTest - _connection.close(); - stopBroker(port); - - System.setProperty("qpid.heartbeat", "1"); - - // in case this broker gets stuck, atleast the rest of the tests will not fail. - port = port + 200; - String startCmd = getBrokerCommand(port); - - // start a broker using a script - ProcessBuilder pb = new ProcessBuilder(System.getProperty("broker.start")); - pb.redirectErrorStream(true); - - Map<String, String> env = pb.environment(); - env.put("BROKER_CMD",startCmd); - env.put("BROKER_READY",System.getProperty(BROKER_READY)); - - Process startScript = pb.start(); - startScript.waitFor(); - startScript.destroy(); - - Connection con = - new AMQConnection("amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:" + port + "'"); - final AtomicBoolean lock = new AtomicBoolean(false); - - String cmd = "/usr/bin/pgrep -f " + port; - process = Runtime.getRuntime().exec("/bin/bash"); - LineNumberReader reader = new LineNumberReader(new InputStreamReader(process.getInputStream())); - PrintWriter out = new PrintWriter(new BufferedWriter(new OutputStreamWriter(process.getOutputStream())), true); - out.println(cmd); - pid = reader.readLine(); - try - { - Integer.parseInt(pid); - } - catch (NumberFormatException e) - { - // Error! try to read further to gather the error msg. - String line; - _logger.debug(pid); - while ((line = reader.readLine()) != null ) - { - _logger.debug(line); - } - throw new Exception( "Unable to get the brokers pid " + pid); - } - _logger.debug("pid : " + pid); - - con.setExceptionListener(new ExceptionListener(){ - - public void onException(JMSException e) - { - synchronized(lock) { - lock.set(true); - lock.notifyAll(); - } - } - }); - out.println("kill -STOP " + pid); - - synchronized(lock){ - lock.wait(2500); - } - out.close(); - reader.close(); - assertTrue("Client did not terminate the connection, check log for details",lock.get()); - } - catch(Exception e) - { - throw e; - } - finally - { - System.setProperty("qpid.heartbeat", ""); - - if (process != null) - { - process.destroy(); - } - - Process killScript = Runtime.getRuntime().exec(System.getProperty("broker.kill") + " " + pid); - killScript.waitFor(); - killScript.destroy(); - cleanBroker(); - } - } - - public static junit.framework.Test suite() - { - return new junit.framework.TestSuite(AMQConnectionTest.class); - } } |