summaryrefslogtreecommitdiff
path: root/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java')
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java168
1 files changed, 102 insertions, 66 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java
index 0dbf95052f..ca38807fb1 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java
@@ -29,6 +29,7 @@ import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
+import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.url.URLSyntaxException;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
@@ -42,106 +43,71 @@ import java.util.Hashtable;
import java.util.HashMap;
import java.util.Map;
-/** Test Case provided by client Non-functional Test NF101: heap exhaustion behaviour */
-public class PriorityTest extends TestCase
+public class PriorityTest extends QpidTestCase
{
- private static final Logger _logger = Logger.getLogger(PriorityTest.class);
+ private static final int TIMEOUT = 1500;
- protected final String BROKER = "vm://:1";
- protected final String VHOST = "/test";
- protected final String QUEUE = "PriorityQueue";
+ private static final Logger _logger = Logger.getLogger(PriorityTest.class);
+ protected final String QUEUE = "PriorityQueue";
private static final int MSG_COUNT = 50;
+ private Connection producerConnection;
+ private MessageProducer producer;
+ private Session producerSession;
+ private Queue queue;
+ private Connection consumerConnection;
+ private Session consumerSession;
+
+
+ private MessageConsumer consumer;
+
protected void setUp() throws Exception
{
super.setUp();
- if (usingInVMBroker())
- {
- TransportConnection.createVMBroker(1);
- }
-
-
- }
+ producerConnection = getConnection();
+ producerSession = producerConnection.createSession(true, Session.AUTO_ACKNOWLEDGE);
- private boolean usingInVMBroker()
- {
- return BROKER.startsWith("vm://");
+ producerConnection.start();
+
+ consumerConnection = getConnection();
+ consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
}
protected void tearDown() throws Exception
{
- if (usingInVMBroker())
- {
- TransportConnection.killAllVMBrokers();
- }
+ producerConnection.close();
+ consumerConnection.close();
super.tearDown();
}
public void testPriority() throws JMSException, NamingException, AMQException
{
- InitialContextFactory factory = new PropertiesFileInitialContextFactory();
-
- Hashtable<String, String> env = new Hashtable<String, String>();
-
- env.put("connectionfactory.connection", "amqp://guest:guest@PRIORITY_TEST_ID" + VHOST + "?brokerlist='" + BROKER + "'");
- env.put("queue.queue", QUEUE);
-
- Context context = factory.getInitialContext(env);
-
- Connection producerConnection = ((ConnectionFactory) context.lookup("connection")).createConnection();
-
- Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
final Map<String,Object> arguments = new HashMap<String, Object>();
arguments.put("x-qpid-priorities",10);
-
((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
-
- Queue queue = new AMQQueue("amq.direct",QUEUE);
+ queue = (Queue) producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'");
((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
-
-
-
-
-
-
- producerConnection.start();
-
-
- MessageProducer producer = producerSession.createProducer(queue);
-
-
-
-
+ producer = producerSession.createProducer(queue);
for (int msg = 0; msg < MSG_COUNT; msg++)
{
producer.setPriority(msg % 10);
producer.send(nextMessage(msg, false, producerSession, producer));
}
-
+ producerSession.commit();
producer.close();
producerSession.close();
producerConnection.close();
-
- Connection consumerConnection = ((ConnectionFactory) context.lookup("connection")).createConnection();
- Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = consumerSession.createConsumer(queue);
-
-
-
-
+ consumer = consumerSession.createConsumer(queue);
consumerConnection.start();
-
Message received;
- //Receive Message 0
- StringBuilder buf = new StringBuilder();
int receivedCount = 0;
Message previous = null;
int messageCount = 0;
@@ -158,10 +124,80 @@ public class PriorityTest extends TestCase
}
assertEquals("Incorrect number of message received", 50, receivedCount);
-
- producerSession.close();
- producer.close();
-
+ }
+
+ public void testOddOrdering() throws AMQException, JMSException
+ {
+ final Map<String,Object> arguments = new HashMap<String, Object>();
+ arguments.put("x-qpid-priorities",3);
+ ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
+ queue = producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'");
+
+ ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
+ producer = producerSession.createProducer(queue);
+
+ // In order ABC
+ producer.setPriority(9);
+ producer.send(nextMessage(1, false, producerSession, producer));
+ producer.setPriority(4);
+ producer.send(nextMessage(2, false, producerSession, producer));
+ producer.setPriority(1);
+ producer.send(nextMessage(3, false, producerSession, producer));
+
+ // Out of order BAC
+ producer.setPriority(4);
+ producer.send(nextMessage(4, false, producerSession, producer));
+ producer.setPriority(9);
+ producer.send(nextMessage(5, false, producerSession, producer));
+ producer.setPriority(1);
+ producer.send(nextMessage(6, false, producerSession, producer));
+
+ // Out of order BCA
+ producer.setPriority(4);
+ producer.send(nextMessage(7, false, producerSession, producer));
+ producer.setPriority(1);
+ producer.send(nextMessage(8, false, producerSession, producer));
+ producer.setPriority(9);
+ producer.send(nextMessage(9, false, producerSession, producer));
+
+ // Reverse order CBA
+ producer.setPriority(1);
+ producer.send(nextMessage(10, false, producerSession, producer));
+ producer.setPriority(4);
+ producer.send(nextMessage(11, false, producerSession, producer));
+ producer.setPriority(9);
+ producer.send(nextMessage(12, false, producerSession, producer));
+ producerSession.commit();
+
+ consumer = consumerSession.createConsumer(queue);
+ consumerConnection.start();
+
+ Message msg = consumer.receive(TIMEOUT);
+ assertEquals(1, msg.getIntProperty("msg"));
+ msg = consumer.receive(TIMEOUT);
+ assertEquals(5, msg.getIntProperty("msg"));
+ msg = consumer.receive(TIMEOUT);
+ assertEquals(9, msg.getIntProperty("msg"));
+ msg = consumer.receive(TIMEOUT);
+ assertEquals(12, msg.getIntProperty("msg"));
+
+ msg = consumer.receive(TIMEOUT);
+ assertEquals(2, msg.getIntProperty("msg"));
+ msg = consumer.receive(TIMEOUT);
+ assertEquals(4, msg.getIntProperty("msg"));
+ msg = consumer.receive(TIMEOUT);
+ assertEquals(7, msg.getIntProperty("msg"));
+ msg = consumer.receive(TIMEOUT);
+ assertEquals(11, msg.getIntProperty("msg"));
+
+ msg = consumer.receive(TIMEOUT);
+ assertEquals(3, msg.getIntProperty("msg"));
+ msg = consumer.receive(TIMEOUT);
+ assertEquals(6, msg.getIntProperty("msg"));
+ msg = consumer.receive(TIMEOUT);
+ assertEquals(8, msg.getIntProperty("msg"));
+ msg = consumer.receive(TIMEOUT);
+ assertEquals(10, msg.getIntProperty("msg"));
}
private Message nextMessage(int msg, boolean first, Session producerSession, MessageProducer producer) throws JMSException