summaryrefslogtreecommitdiff
path: root/qpid/java/systests/src/main/java/org/apache/qpid/test/unit
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/systests/src/main/java/org/apache/qpid/test/unit')
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/Acknowledge2ConsumersTest.java4
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/QuickAcking.java2
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java1
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java6
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/DynamicQueueExchangeCreateTest.java22
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java1
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java105
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java64
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java25
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java7
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java4
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java41
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutConfigurationTest.java82
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutDisabledTest.java72
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java335
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java253
16 files changed, 119 insertions, 905 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/Acknowledge2ConsumersTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/Acknowledge2ConsumersTest.java
index 23efb656d2..4b45a96c20 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/Acknowledge2ConsumersTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/Acknowledge2ConsumersTest.java
@@ -23,7 +23,7 @@ package org.apache.qpid.test.unit.ack;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.test.utils.FailoverBaseCase;
import javax.jms.Connection;
import javax.jms.JMSException;
@@ -32,7 +32,7 @@ import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
-public class Acknowledge2ConsumersTest extends QpidBrokerTestCase
+public class Acknowledge2ConsumersTest extends FailoverBaseCase
{
protected static int NUM_MESSAGES = 100;
protected Connection _con;
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/QuickAcking.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/QuickAcking.java
index 13c78c1e14..6c83136511 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/QuickAcking.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/QuickAcking.java
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.test.unit.ack;
-import java.util.concurrent.CountDownLatch;
+import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.jms.ConnectionListener;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java
index 87eae32cf8..3a5f676ca6 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java
@@ -23,6 +23,7 @@ import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java
index 481b144caf..292bcd6039 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.test.unit.client;
+import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.InputStreamReader;
import java.io.LineNumberReader;
@@ -36,9 +37,11 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.QueueSession;
import javax.jms.Session;
+import javax.jms.TextMessage;
import javax.jms.TopicSession;
import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQConnectionDelegate_0_10;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQTopic;
@@ -228,8 +231,7 @@ public class AMQConnectionTest extends QpidBrokerTestCase
}
MessageConsumer consumerB = null;
- // 0-8, 0-9, 0-9-1 prefetch is per session, not consumer.
- if (!isBroker010())
+ if (isBroker08())
{
Session consSessB = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
consumerB = consSessB.createConsumer(_queue);
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/DynamicQueueExchangeCreateTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/DynamicQueueExchangeCreateTest.java
index 8577fb5b6a..33575b58aa 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/DynamicQueueExchangeCreateTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/DynamicQueueExchangeCreateTest.java
@@ -20,8 +20,6 @@
*/
package org.apache.qpid.test.unit.client;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
import javax.jms.Connection;
@@ -34,9 +32,11 @@ import javax.jms.Session;
*
* Test to validate that setting the respective qpid.declare_queues,
* qpid.declare_exchanges system properties functions as expected.
+ *
*/
public class DynamicQueueExchangeCreateTest extends QpidBrokerTestCase
{
+
public void testQueueDeclare() throws Exception
{
setSystemProperty("qpid.declare_queues", "false");
@@ -53,8 +53,11 @@ public class DynamicQueueExchangeCreateTest extends QpidBrokerTestCase
fail("JMSException should be thrown as the queue does not exist");
}
catch (JMSException e)
- {
- checkExceptionErrorCode(e, AMQConstant.NOT_FOUND);
+ {
+ assertTrue("Exception should be that the queue does not exist :" +
+ e.getMessage(),
+ e.getMessage().contains("does not exist"));
+
}
}
@@ -76,15 +79,10 @@ public class DynamicQueueExchangeCreateTest extends QpidBrokerTestCase
}
catch (JMSException e)
{
- checkExceptionErrorCode(e, AMQConstant.NOT_FOUND);
+ assertTrue("Exception should be that the exchange does not exist :" +
+ e.getMessage(),
+ e.getMessage().contains("Exchange " + EXCHANGE_TYPE + " does not exist"));
}
}
- private void checkExceptionErrorCode(JMSException original, AMQConstant code)
- {
- Exception linked = original.getLinkedException();
- assertNotNull("Linked exception should have been set", linked);
- assertTrue("Linked exception should be an AMQException", linked instanceof AMQException);
- assertEquals("Error code should be " + code.getCode(), code, ((AMQException) linked).getErrorCode());
- }
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java
index aae8b1feb9..79e2ff8148 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java
@@ -24,6 +24,7 @@ import junit.textui.TestRunner;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
import org.slf4j.Logger;
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java
index 2e8a2d049d..f0794c9dab 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java
@@ -25,9 +25,11 @@ import org.apache.qpid.test.utils.QpidBrokerTestCase;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.framing.*;
import org.apache.qpid.jms.ConnectionListener;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.url.URLSyntaxException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,67 +49,70 @@ public class ChannelCloseTest extends QpidBrokerTestCase implements ExceptionLis
private static final Logger _logger = LoggerFactory.getLogger(ChannelCloseTest.class);
Connection _connection;
+ private String _brokerlist = "vm://:1";
private Session _session;
private static final long SYNC_TIMEOUT = 500;
private int TEST = 0;
- /**
- * Close channel, use chanel with same id ensure error.
- *
- * This test is only valid for non 0-10 connection .
+ /*
+ close channel, use chanel with same id ensure error.
*/
public void testReusingChannelAfterFullClosure() throws Exception
{
- _connection=newConnection();
-
- // Create Producer
- try
+ // this is testing an inVM Connetion conneciton
+ if (isJavaBroker() && !isExternalBroker())
{
- _connection.start();
-
- createChannelAndTest(1);
+ _connection=newConnection();
- // Cause it to close
+ // Create Producer
try
{
- _logger.info("Testing invalid exchange");
- declareExchange(1, "", "name_that_will_lookup_to_null", false);
- fail("Exchange name is empty so this should fail ");
- }
- catch (AMQException e)
- {
- assertEquals("Exchange should not be found", AMQConstant.NOT_FOUND, e.getErrorCode());
- }
+ _connection.start();
- // Check that
- try
- {
- _logger.info("Testing valid exchange should fail");
- declareExchange(1, "topic", "amq.topic", false);
- fail("This should not succeed as the channel should be closed ");
- }
- catch (AMQException e)
- {
- if (_logger.isInfoEnabled())
+ createChannelAndTest(1);
+
+ // Cause it to close
+ try
+ {
+ _logger.info("Testing invalid exchange");
+ declareExchange(1, "", "name_that_will_lookup_to_null", false);
+ fail("Exchange name is empty so this should fail ");
+ }
+ catch (AMQException e)
{
- _logger.info("Exception occured was:" + e.getErrorCode());
+ assertEquals("Exchange should not be found", AMQConstant.NOT_FOUND, e.getErrorCode());
}
- assertEquals("Connection should be closed", AMQConstant.CHANNEL_ERROR, e.getErrorCode());
+ // Check that
+ try
+ {
+ _logger.info("Testing valid exchange should fail");
+ declareExchange(1, "topic", "amq.topic", false);
+ fail("This should not succeed as the channel should be closed ");
+ }
+ catch (AMQException e)
+ {
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Exception occured was:" + e.getErrorCode());
+ }
- _connection=newConnection();
- }
+ assertEquals("Connection should be closed", AMQConstant.CHANNEL_ERROR, e.getErrorCode());
- checkSendingMessage();
+ _connection=newConnection();
+ }
- _session.close();
- _connection.close();
+ checkSendingMessage();
- }
- catch (JMSException e)
- {
- e.printStackTrace();
- fail(e.getMessage());
+ _session.close();
+ _connection.close();
+
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
}
}
@@ -301,19 +306,27 @@ public class ChannelCloseTest extends QpidBrokerTestCase implements ExceptionLis
private Connection newConnection()
{
- Connection connection = null;
+ AMQConnection connection = null;
try
{
- connection = getConnection();
+ connection = new AMQConnection("amqp://guest:guest@CCTTest/test?brokerlist='" + _brokerlist + "'");
- ((AMQConnection) connection).setConnectionListener(this);
+ connection.setConnectionListener(this);
_session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
connection.start();
}
- catch (Exception e)
+ catch (JMSException e)
+ {
+ fail("Creating new connection when:" + e.getMessage());
+ }
+ catch (AMQException e)
+ {
+ fail("Creating new connection when:" + e.getMessage());
+ }
+ catch (URLSyntaxException e)
{
fail("Creating new connection when:" + e.getMessage());
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
index 124e756fad..04fc611cd1 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
@@ -20,30 +20,32 @@
*/
package org.apache.qpid.test.unit.client.connection;
-import javax.jms.Connection;
-import javax.jms.QueueSession;
-import javax.jms.TopicSession;
-
import org.apache.qpid.AMQConnectionFailureException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQUnresolvedAddressException;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.client.AMQAuthenticationException;
import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQConnectionURL;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.client.AMQConnectionURL;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.jms.Session;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.jms.BrokerDetails;
+
+import javax.jms.Connection;
+import javax.jms.QueueSession;
+import javax.jms.TopicSession;
+import javax.naming.NamingException;
public class ConnectionTest extends QpidBrokerTestCase
{
- String _broker_NotRunning = "tcp://localhost:" + findFreePort();
-
+ String _broker_NotRunning = "vm://:2";
String _broker_BadDNS = "tcp://hg3sgaaw4lgihjs";
public void testSimpleConnection() throws Exception
@@ -85,17 +87,17 @@ public class ConnectionTest extends QpidBrokerTestCase
AMQSession sess = (AMQSession) conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- sess.declareExchange(new AMQShortString("test.direct"),
+
+ sess.declareExchange(new AMQShortString("test.direct"),
ExchangeDefaults.DIRECT_EXCHANGE_CLASS, false);
- sess.declareExchange(new AMQShortString("tmp.direct"),
+ sess.declareExchange(new AMQShortString("tmp.direct"),
ExchangeDefaults.DIRECT_EXCHANGE_CLASS, false);
- sess.declareExchange(new AMQShortString("tmp.topic"),
+ sess.declareExchange(new AMQShortString("tmp.topic"),
ExchangeDefaults.TOPIC_EXCHANGE_CLASS, false);
- sess.declareExchange(new AMQShortString("test.topic"),
+ sess.declareExchange(new AMQShortString("test.topic"),
ExchangeDefaults.TOPIC_EXCHANGE_CLASS, false);
QueueSession queueSession = conn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -111,7 +113,7 @@ public class ConnectionTest extends QpidBrokerTestCase
queueSession.close();
TopicSession topicSession = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
-
+
AMQTopic topic = (AMQTopic) topicSession.createTopic("silly.topic");
assertEquals(topic.getExchangeName().toString(), "test.topic");
@@ -269,7 +271,7 @@ public class ConnectionTest extends QpidBrokerTestCase
}
connection.close();
}
-
+
public void testUnsupportedSASLMechanism() throws Exception
{
BrokerDetails broker = getBroker();
@@ -287,37 +289,11 @@ public class ConnectionTest extends QpidBrokerTestCase
{
assertTrue("Incorrect exception thrown",
e.getMessage().contains("The following SASL mechanisms " +
- "[MY_MECH]" +
+ "[MY_MECH]" +
" specified by the client are not supported by the broker"));
}
}
- public void testClientIDVerification() throws Exception
- {
- System.setProperty("qpid.verify_client_id", "true");
- BrokerDetails broker = getBroker();
- try
- {
- Connection con = new AMQConnection(broker.toString(), "guest", "guest",
- "client_id", "test");
-
- Connection con2 = new AMQConnection(broker.toString(), "guest", "guest",
- "client_id", "test");
-
- fail("The client should throw a ConnectionException stating the" +
- " client ID is not unique");
- }
- catch (Exception e)
- {
- assertTrue("Incorrect exception thrown",
- e.getMessage().contains("ClientID must be unique"));
- }
- finally
- {
- System.setProperty("qpid.verify_client_id", "false");
- }
- }
-
public static junit.framework.Test suite()
{
return new junit.framework.TestSuite(ConnectionTest.class);
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
index 5701b5a1fd..278b9e9c04 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
@@ -31,21 +31,21 @@ import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
-import org.apache.qpid.transport.TestNetworkConnection;
+import org.apache.qpid.transport.TestNetworkDriver;
public class AMQProtocolSessionTest extends QpidBrokerTestCase
{
- private static class TestProtocolSession extends AMQProtocolSession
+ private static class AMQProtSession extends AMQProtocolSession
{
- public TestProtocolSession(AMQProtocolHandler protocolHandler, AMQConnection connection)
+ public AMQProtSession(AMQProtocolHandler protocolHandler, AMQConnection connection)
{
super(protocolHandler,connection);
}
- public TestNetworkConnection getNetworkConnection()
+ public TestNetworkDriver getNetworkDriver()
{
- return (TestNetworkConnection) _protocolHandler.getNetworkConnection();
+ return (TestNetworkDriver) _protocolHandler.getNetworkDriver();
}
public AMQShortString genQueueName()
@@ -54,7 +54,7 @@ public class AMQProtocolSessionTest extends QpidBrokerTestCase
}
}
- private TestProtocolSession _testSession;
+ private AMQProtSession _testSession;
protected void setUp() throws Exception
{
@@ -62,10 +62,10 @@ public class AMQProtocolSessionTest extends QpidBrokerTestCase
AMQConnection con = (AMQConnection) getConnection("guest", "guest");
AMQProtocolHandler protocolHandler = new AMQProtocolHandler(con);
- protocolHandler.setNetworkConnection(new TestNetworkConnection());
-
+ protocolHandler.setNetworkDriver(new TestNetworkDriver());
+
//don't care about the values set here apart from the dummy IoSession
- _testSession = new TestProtocolSession(protocolHandler , con);
+ _testSession = new AMQProtSession(protocolHandler , con);
}
public void testTemporaryQueueWildcard() throws UnknownHostException
@@ -93,9 +93,14 @@ public class AMQProtocolSessionTest extends QpidBrokerTestCase
checkTempQueueName(new InetSocketAddress(InetAddress.getByName("1080:0:0:0:8:800:200C:417A"), 1234), "tmp_1080_0_0_0_8_800_200c_417a_1234_1");
}
+ public void testTemporaryQueuePipe() throws UnknownHostException
+ {
+ checkTempQueueName(new VmPipeAddress(1), "tmp_vm_1_1");
+ }
+
private void checkTempQueueName(SocketAddress address, String queueName)
{
- _testSession.getNetworkConnection().setLocalAddress(address);
+ _testSession.getNetworkDriver().setLocalAddress(address);
assertEquals("Wrong queue name", queueName, _testSession.genQueueName().asString());
}
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
index f5e0ed75d2..de092fc893 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
@@ -50,6 +50,7 @@ public class MessageRequeueTest extends QpidBrokerTestCase
protected final String queue = "direct://amq.direct//message-requeue-test-queue";
protected String payload = "Message:";
+ //protected final String BROKER = "vm://:1";
protected final String BROKER = "tcp://127.0.0.1:5672";
private boolean testReception = true;
@@ -154,8 +155,8 @@ public class MessageRequeueTest extends QpidBrokerTestCase
_logger.info("consumed: " + messagesReceived);
assertEquals("number of consumed messages does not match initial data", (int) numTestMessages, messagesReceived);
- // with 0_10 we can have a delivery tag of 0
- if (!conn.isBroker010())
+ // wit 0_10 we can have a delivery tag of 0
+ if (conn.isBroker08())
{
for (long b : messageLog)
{
@@ -223,7 +224,7 @@ public class MessageRequeueTest extends QpidBrokerTestCase
StringBuilder list = new StringBuilder();
list.append("Failed to receive:");
int failed = 0;
- if (!conn.isBroker010())
+ if (conn.isBroker08())
{
for (long b : receieved)
{
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java
index 80422cf3e9..989ac98747 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java
@@ -52,7 +52,7 @@ public class DurableSubscriberTest extends QpidBrokerTestCase
*/
public void testDurSubRestoredAfterNonPersistentMessageSent() throws Exception
{
- if (isBrokerStorePersistent())
+ if (isBrokerStorePersistent() || !isBroker08())
{
TopicConnectionFactory factory = getConnectionFactory();
Topic topic = (Topic) getInitialContext().lookup(_topicName);
@@ -116,7 +116,7 @@ public class DurableSubscriberTest extends QpidBrokerTestCase
*/
public void testDurSubRestoresMessageSelector() throws Exception
{
- if (isBrokerStorePersistent())
+ if (isBrokerStorePersistent() || !isBroker08())
{
TopicConnectionFactory factory = getConnectionFactory();
Topic topic = (Topic) getInitialContext().lookup(_topicName);
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java
index 97452ad1c8..830421a01f 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java
@@ -25,14 +25,12 @@ import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.message.NonQpidObjectMessage;
-import org.apache.qpid.client.message.QpidMessageProperties;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
@@ -41,11 +39,7 @@ import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.Session;
-import javax.jms.Topic;
-
import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.Map;
/**
* @author Apache Software Foundation
@@ -169,39 +163,4 @@ public class JMSPropertiesTest extends QpidBrokerTestCase
con.close();
}
- /**
- * Test Goal : Test if custom message properties can be set and retrieved properly with out an error.
- * Also test if unsupported properties are filtered out. See QPID-2930.
- */
- public void testQpidExtensionProperties() throws Exception
- {
- Connection con = getConnection("guest", "guest");
- Session ssn = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- con.start();
-
- Topic topic = ssn.createTopic("test");
- MessageConsumer consumer = ssn.createConsumer(topic);
- MessageProducer prod = ssn.createProducer(topic);
- Message m = ssn.createMessage();
- m.setObjectProperty("foo-bar", "foobar".getBytes());
- m.setObjectProperty(QpidMessageProperties.AMQP_0_10_APP_ID, "my-app-id");
- prod.send(m);
-
- Message msg = consumer.receive(1000);
- assertNotNull(msg);
-
- Enumeration<String> enu = msg.getPropertyNames();
- Map<String,String> map = new HashMap<String,String>();
- while (enu.hasMoreElements())
- {
- String name = enu.nextElement();
- String value = msg.getStringProperty(name);
- map.put(name, value);
- }
-
- assertFalse("Property 'foo-bar' should have been filtered out",map.containsKey("foo-bar"));
- assertEquals("Property "+ QpidMessageProperties.AMQP_0_10_APP_ID + " should be present","my-app-id",msg.getStringProperty(QpidMessageProperties.AMQP_0_10_APP_ID));
- assertEquals("Property "+ QpidMessageProperties.AMQP_0_10_ROUTING_KEY + " should be present","test",msg.getStringProperty(QpidMessageProperties.AMQP_0_10_ROUTING_KEY));
-
- }
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutConfigurationTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutConfigurationTest.java
deleted file mode 100644
index 36bac3b715..0000000000
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutConfigurationTest.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.test.unit.transacted;
-
-/**
- * This verifies that changing the {@code transactionTimeout} configuration will alter
- * the behaviour of the transaction open and idle logging, and that when the connection
- * will be closed.
- */
-public class TransactionTimeoutConfigurationTest extends TransactionTimeoutTestCase
-{
- @Override
- protected void configure() throws Exception
- {
- // Setup housekeeping every second
- setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".housekeeping.expiredMessageCheckPeriod", "100");
-
- // Set transaction timout properties.
- setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.openWarn", "200");
- setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.openClose", "1000");
- setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.idleWarn", "100");
- setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.idleClose", "500");
- }
-
- public void testProducerIdleCommit() throws Exception
- {
- try
- {
- send(5, 0);
-
- sleep(2.0f);
-
- _psession.commit();
- fail("should fail");
- }
- catch (Exception e)
- {
- _exception = e;
- }
-
- monitor(5, 0);
-
- check(IDLE);
- }
-
- public void testProducerOpenCommit() throws Exception
- {
- try
- {
- send(5, 0.3f);
-
- _psession.commit();
- fail("should fail");
- }
- catch (Exception e)
- {
- _exception = e;
- }
-
- monitor(6, 3);
-
- check(OPEN);
- }
-}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutDisabledTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutDisabledTest.java
deleted file mode 100644
index 71b89bf911..0000000000
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutDisabledTest.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.test.unit.transacted;
-
-/**
- * This verifies that the default behaviour is not to time out transactions.
- */
-public class TransactionTimeoutDisabledTest extends TransactionTimeoutTestCase
-{
- @Override
- protected void configure() throws Exception
- {
- // Setup housekeeping every second
- setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".housekeeping.expiredMessageCheckPeriod", "100");
- }
-
- public void testProducerIdleCommit() throws Exception
- {
- try
- {
- send(5, 0);
-
- sleep(2.0f);
-
- _psession.commit();
- }
- catch (Exception e)
- {
- fail("Should have succeeded");
- }
-
- assertTrue("Listener should not have received exception", _caught.getCount() == 1);
-
- monitor(0, 0);
- }
-
- public void testProducerOpenCommit() throws Exception
- {
- try
- {
- send(5, 0.3f);
-
- _psession.commit();
- }
- catch (Exception e)
- {
- fail("Should have succeeded");
- }
-
- assertTrue("Listener should not have received exception", _caught.getCount() == 1);
-
- monitor(0, 0);
- }
-}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java
deleted file mode 100644
index c912d6a323..0000000000
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java
+++ /dev/null
@@ -1,335 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.test.unit.transacted;
-
-/**
- * This tests the behaviour of transactional sessions when the {@code transactionTimeout} configuration
- * is set for a virtual host.
- *
- * A producer that is idle for too long or open for too long will have its connection closed and
- * any further operations will fail with a 408 resource timeout exception. Consumers will not
- * be affected by the transaction timeout configuration.
- */
-public class TransactionTimeoutTest extends TransactionTimeoutTestCase
-{
- public void testProducerIdle() throws Exception
- {
- try
- {
- sleep(2.0f);
-
- _psession.commit();
- }
- catch (Exception e)
- {
- fail("Should have succeeded");
- }
-
- assertTrue("Listener should not have received exception", _caught.getCount() == 1);
-
- monitor(0, 0);
- }
-
- public void testProducerIdleCommit() throws Exception
- {
- try
- {
- send(5, 0);
-
- sleep(2.0f);
-
- _psession.commit();
- fail("should fail");
- }
- catch (Exception e)
- {
- _exception = e;
- }
-
- monitor(5, 0);
-
- check(IDLE);
- }
-
- public void testProducerOpenCommit() throws Exception
- {
- try
- {
- send(6, 0.5f);
-
- _psession.commit();
- fail("should fail");
- }
- catch (Exception e)
- {
- _exception = e;
- }
-
- monitor(0, 10);
-
- check(OPEN);
- }
-
- public void testProducerIdleCommitTwice() throws Exception
- {
- try
- {
- send(5, 0);
-
- sleep(1.0f);
-
- _psession.commit();
-
- send(5, 0);
-
- sleep(2.0f);
-
- _psession.commit();
- fail("should fail");
- }
- catch (Exception e)
- {
- _exception = e;
- }
-
- monitor(10, 0);
-
- check(IDLE);
- }
-
- public void testProducerOpenCommitTwice() throws Exception
- {
- try
- {
- send(5, 0);
-
- sleep(1.0f);
-
- _psession.commit();
-
- send(6, 0.5f);
-
- _psession.commit();
- fail("should fail");
- }
- catch (Exception e)
- {
- _exception = e;
- }
-
- // the presistent store generates more idle messages?
- monitor(isBrokerStorePersistent() ? 10 : 5, 10);
-
- check(OPEN);
- }
-
- public void testProducerIdleRollback() throws Exception
- {
- try
- {
- send(5, 0);
-
- sleep(2.0f);
-
- _psession.rollback();
- fail("should fail");
- }
- catch (Exception e)
- {
- _exception = e;
- }
-
- monitor(5, 0);
-
- check(IDLE);
- }
-
- public void testProducerIdleRollbackTwice() throws Exception
- {
- try
- {
- send(5, 0);
-
- sleep(1.0f);
-
- _psession.rollback();
-
- send(5, 0);
-
- sleep(2.0f);
-
- _psession.rollback();
- fail("should fail");
- }
- catch (Exception e)
- {
- _exception = e;
- }
-
- monitor(10, 0);
-
- check(IDLE);
- }
-
- public void testConsumerCommitClose() throws Exception
- {
- try
- {
- send(1, 0);
-
- _psession.commit();
-
- expect(1, 0);
-
- _csession.commit();
-
- sleep(3.0f);
-
- _csession.close();
- }
- catch (Exception e)
- {
- fail("should have succeeded: " + e.getMessage());
- }
-
- assertTrue("Listener should not have received exception", _caught.getCount() == 1);
-
- monitor(0, 0);
- }
-
- public void testConsumerIdleReceiveCommit() throws Exception
- {
- try
- {
- send(1, 0);
-
- _psession.commit();
-
- sleep(2.0f);
-
- expect(1, 0);
-
- sleep(2.0f);
-
- _csession.commit();
- }
- catch (Exception e)
- {
- fail("Should have succeeded");
- }
-
- assertTrue("Listener should not have received exception", _caught.getCount() == 1);
-
- monitor(0, 0);
- }
-
- public void testConsumerIdleCommit() throws Exception
- {
- try
- {
- send(1, 0);
-
- _psession.commit();
-
- expect(1, 0);
-
- sleep(2.0f);
-
- _csession.commit();
- }
- catch (Exception e)
- {
- fail("Should have succeeded");
- }
-
- assertTrue("Listener should not have received exception", _caught.getCount() == 1);
-
- monitor(0, 0);
- }
-
- public void testConsumerIdleRollback() throws Exception
- {
- try
- {
- send(1, 0);
-
- _psession.commit();
-
- expect(1, 0);
-
- sleep(2.0f);
-
- _csession.rollback();
- }
- catch (Exception e)
- {
- fail("Should have succeeded");
- }
-
- assertTrue("Listener should not have received exception", _caught.getCount() == 1);
-
- monitor(0, 0);
- }
-
- public void testConsumerOpenCommit() throws Exception
- {
- try
- {
- send(1, 0);
-
- _psession.commit();
-
- sleep(3.0f);
-
- _csession.commit();
- }
- catch (Exception e)
- {
- fail("Should have succeeded");
- }
-
- assertTrue("Listener should not have received exception", _caught.getCount() == 1);
-
- monitor(0, 0);
- }
-
- public void testConsumerOpenRollback() throws Exception
- {
- try
- {
- send(1, 0);
-
- _psession.commit();
-
- sleep(3.0f);
-
- _csession.rollback();
- }
- catch (Exception e)
- {
- fail("Should have succeeded");
- }
-
- assertTrue("Listener should not have received exception", _caught.getCount() == 1);
-
- monitor(0, 0);
- }
-}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java
deleted file mode 100644
index 786fc2adb0..0000000000
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java
+++ /dev/null
@@ -1,253 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.test.unit.transacted;
-
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.DeliveryMode;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.TextMessage;
-
-import junit.framework.TestCase;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQConnectionURL;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.jms.ConnectionURL;
-import org.apache.qpid.jms.Session;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-import org.apache.qpid.util.LogMonitor;
-
-/**
- * The {@link TestCase} for transaction timeout testing.
- */
-public class TransactionTimeoutTestCase extends QpidBrokerTestCase implements ExceptionListener
-{
- public static final String VIRTUALHOST = "test";
- public static final String TEXT = "0123456789abcdefghiforgettherest";
- public static final String CHN_OPEN_TXN = "CHN-1007";
- public static final String CHN_IDLE_TXN = "CHN-1008";
- public static final String IDLE = "Idle";
- public static final String OPEN = "Open";
-
- protected LogMonitor _monitor;
- protected AMQConnection _con;
- protected Session _psession, _csession;
- protected Queue _queue;
- protected MessageConsumer _consumer;
- protected MessageProducer _producer;
- protected CountDownLatch _caught = new CountDownLatch(1);
- protected String _message;
- protected Exception _exception;
- protected AMQConstant _code;
-
- protected void configure() throws Exception
- {
- // Setup housekeeping every second
- setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".housekeeping.expiredMessageCheckPeriod", "100");
-
- /*
- * Set transaction timout properties. The XML in the virtualhosts configuration is as follows:
- *
- * <transactionTimeout>
- * <openWarn>1000</openWarn>
- * <openClose>2000</openClose>
- * <idleWarn>500</idleWarn>
- * <idleClose>1500</idleClose>
- * </transactionTimeout>
- */
- setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.openWarn", "1000");
- setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.openClose", "2000");
- setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.idleWarn", "500");
- setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.idleClose", "1000");
- }
-
- protected void setUp() throws Exception
- {
- // Configure timeouts
- configure();
-
- // Monitor log file
- _monitor = new LogMonitor(_outputFile);
-
- // Start broker
- super.setUp();
-
- // Connect to broker
- String broker = ("tcp://localhost:" + DEFAULT_PORT);
- ConnectionURL url = new AMQConnectionURL("amqp://guest:guest@clientid/test?brokerlist='" + broker + "'&maxprefetch='1'");
- _con = (AMQConnection) getConnection(url);
- _con.setExceptionListener(this);
- _con.start();
-
- // Create queue
- Session qsession = _con.createSession(true, Session.SESSION_TRANSACTED);
- AMQShortString queueName = new AMQShortString("test");
- _queue = new AMQQueue(qsession.getDefaultQueueExchangeName(), queueName, queueName, false, true);
- qsession.close();
-
- // Create producer and consumer
- producer();
- consumer();
- }
-
- protected void tearDown() throws Exception
- {
- try
- {
- _con.close();
- }
- finally
- {
- super.tearDown();
- }
- }
-
- /**
- * Create a transacted persistent message producer session.
- */
- protected void producer() throws Exception
- {
- _psession = _con.createSession(true, Session.SESSION_TRANSACTED);
- _producer = _psession.createProducer(_queue);
- _producer.setDeliveryMode(DeliveryMode.PERSISTENT);
- }
-
- /**
- * Create a transacted message consumer session.
- */
- protected void consumer() throws Exception
- {
- _csession = _con.createSession(true, Session.SESSION_TRANSACTED);
- _consumer = _csession.createConsumer(_queue);
- }
-
- /**
- * Send a number of messages to the queue, optionally pausing after each.
- */
- protected void send(int count, float delay) throws Exception
- {
- for (int i = 0; i < count; i++)
- {
- sleep(delay);
- Message msg = _psession.createTextMessage(TEXT);
- msg.setIntProperty("i", i);
- _producer.send(msg);
- }
- }
-
- /**
- * Sleep for a number of seconds.
- */
- protected void sleep(float seconds) throws Exception
- {
- try
- {
- Thread.sleep((long) (seconds * 1000.0f));
- }
- catch (InterruptedException ie)
- {
- throw new RuntimeException("Interrupted");
- }
- }
-
- /**
- * Check for idle and open messages.
- *
- * Either exactly zero messages, or +-2 error accepted around the specified number.
- */
- protected void monitor(int idle, int open) throws Exception
- {
- List<String> idleMsgs = _monitor.findMatches(CHN_IDLE_TXN);
- List<String> openMsgs = _monitor.findMatches(CHN_OPEN_TXN);
-
- String idleErr = "Expected " + idle + " but found " + idleMsgs.size() + " txn idle messages";
- String openErr = "Expected " + open + " but found " + openMsgs.size() + " txn open messages";
-
- if (idle == 0)
- {
- assertTrue(idleErr, idleMsgs.isEmpty());
- }
- else
- {
- assertTrue(idleErr, idleMsgs.size() >= idle - 2 && idleMsgs.size() <= idle + 2);
- }
-
- if (open == 0)
- {
- assertTrue(openErr, openMsgs.isEmpty());
- }
- else
- {
- assertTrue(openErr, openMsgs.size() >= open - 2 && openMsgs.size() <= open + 2);
- }
- }
-
- /**
- * Receive a number of messages, optionally pausing after each.
- */
- protected void expect(int count, float delay) throws Exception
- {
- for (int i = 0; i < count; i++)
- {
- sleep(delay);
- Message msg = _consumer.receive(1000);
- assertNotNull("Message should not be null", msg);
- assertTrue("Message should be a text message", msg instanceof TextMessage);
- assertEquals("Message content does not match expected", TEXT, ((TextMessage) msg).getText());
- assertEquals("Message order is incorrect", i, msg.getIntProperty("i"));
- }
- }
-
- /**
- * Checks that the correct exception was thrown and was received
- * by the listener with a 506 error code.
- */
- protected void check(String reason)throws InterruptedException
- {
- assertTrue("Should have caught exception in listener", _caught.await(1, TimeUnit.SECONDS));
- assertNotNull("Should have thrown exception to client", _exception);
- assertTrue("Exception message should contain '" + reason + "': " + _message, _message.contains(reason + " transaction timed out"));
- assertNotNull("Exception should have an error code", _code);
- assertEquals("Error code should be 506", AMQConstant.RESOURCE_ERROR, _code);
- }
-
- /** @see javax.jms.ExceptionListener#onException(javax.jms.JMSException) */
- public void onException(JMSException jmse)
- {
- _caught.countDown();
- _message = jmse.getLinkedException().getMessage();
- if (jmse.getLinkedException() instanceof AMQException)
- {
- _code = ((AMQException) jmse.getLinkedException()).getErrorCode();
- }
- }
-}