summaryrefslogtreecommitdiff
path: root/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java')
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java212
1 files changed, 212 insertions, 0 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java
new file mode 100644
index 0000000000..6203e8a194
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java
@@ -0,0 +1,212 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.queue;
+
+import junit.framework.TestCase;
+import junit.framework.Assert;
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+
+import javax.jms.*;
+import javax.naming.NamingException;
+import javax.naming.Context;
+import javax.naming.spi.InitialContextFactory;
+import java.util.Hashtable;
+import java.util.HashMap;
+import java.util.Map;
+
+public class PriorityTest extends QpidBrokerTestCase
+{
+ private static final int TIMEOUT = 1500;
+
+
+ private static final Logger _logger = Logger.getLogger(PriorityTest.class);
+
+ protected final String QUEUE = "PriorityQueue";
+
+ private static final int MSG_COUNT = 50;
+
+ private Connection producerConnection;
+ private MessageProducer producer;
+ private Session producerSession;
+ private Queue queue;
+ private Connection consumerConnection;
+ private Session consumerSession;
+
+
+ private MessageConsumer consumer;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ producerConnection = getConnection();
+ producerSession = producerConnection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+
+ producerConnection.start();
+
+ consumerConnection = getConnection();
+ consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ }
+
+ protected void tearDown() throws Exception
+ {
+ producerConnection.close();
+ consumerConnection.close();
+ super.tearDown();
+ }
+
+ public void testPriority() throws JMSException, NamingException, AMQException
+ {
+ final Map<String,Object> arguments = new HashMap<String, Object>();
+ arguments.put("x-qpid-priorities",10);
+ ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
+ queue = (Queue) producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'");
+
+ ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
+ producer = producerSession.createProducer(queue);
+
+ for (int msg = 0; msg < MSG_COUNT; msg++)
+ {
+ producer.setPriority(msg % 10);
+ producer.send(nextMessage(msg, false, producerSession, producer));
+ }
+ producerSession.commit();
+ producer.close();
+ producerSession.close();
+ producerConnection.close();
+
+ consumer = consumerSession.createConsumer(queue);
+ consumerConnection.start();
+ Message received;
+ int receivedCount = 0;
+ Message previous = null;
+ int messageCount = 0;
+ while((received = consumer.receive(1000))!=null)
+ {
+ messageCount++;
+ if(previous != null)
+ {
+ assertTrue("Messages arrived in unexpected order " + messageCount + " " + previous.getIntProperty("msg") + " " + received.getIntProperty("msg") + " " + previous.getJMSPriority() + " " + received.getJMSPriority(), (previous.getJMSPriority() > received.getJMSPriority()) || ((previous.getJMSPriority() == received.getJMSPriority()) && previous.getIntProperty("msg") < received.getIntProperty("msg")) );
+ }
+
+ previous = received;
+ receivedCount++;
+ }
+
+ assertEquals("Incorrect number of message received", 50, receivedCount);
+ }
+
+ public void testOddOrdering() throws AMQException, JMSException
+ {
+ final Map<String,Object> arguments = new HashMap<String, Object>();
+ arguments.put("x-qpid-priorities",3);
+ ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
+ queue = producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'");
+
+ ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
+ producer = producerSession.createProducer(queue);
+
+ // In order ABC
+ producer.setPriority(9);
+ producer.send(nextMessage(1, false, producerSession, producer));
+ producer.setPriority(4);
+ producer.send(nextMessage(2, false, producerSession, producer));
+ producer.setPriority(1);
+ producer.send(nextMessage(3, false, producerSession, producer));
+
+ // Out of order BAC
+ producer.setPriority(4);
+ producer.send(nextMessage(4, false, producerSession, producer));
+ producer.setPriority(9);
+ producer.send(nextMessage(5, false, producerSession, producer));
+ producer.setPriority(1);
+ producer.send(nextMessage(6, false, producerSession, producer));
+
+ // Out of order BCA
+ producer.setPriority(4);
+ producer.send(nextMessage(7, false, producerSession, producer));
+ producer.setPriority(1);
+ producer.send(nextMessage(8, false, producerSession, producer));
+ producer.setPriority(9);
+ producer.send(nextMessage(9, false, producerSession, producer));
+
+ // Reverse order CBA
+ producer.setPriority(1);
+ producer.send(nextMessage(10, false, producerSession, producer));
+ producer.setPriority(4);
+ producer.send(nextMessage(11, false, producerSession, producer));
+ producer.setPriority(9);
+ producer.send(nextMessage(12, false, producerSession, producer));
+ producerSession.commit();
+
+ consumer = consumerSession.createConsumer(queue);
+ consumerConnection.start();
+
+ Message msg = consumer.receive(TIMEOUT);
+ assertEquals(1, msg.getIntProperty("msg"));
+ msg = consumer.receive(TIMEOUT);
+ assertEquals(5, msg.getIntProperty("msg"));
+ msg = consumer.receive(TIMEOUT);
+ assertEquals(9, msg.getIntProperty("msg"));
+ msg = consumer.receive(TIMEOUT);
+ assertEquals(12, msg.getIntProperty("msg"));
+
+ msg = consumer.receive(TIMEOUT);
+ assertEquals(2, msg.getIntProperty("msg"));
+ msg = consumer.receive(TIMEOUT);
+ assertEquals(4, msg.getIntProperty("msg"));
+ msg = consumer.receive(TIMEOUT);
+ assertEquals(7, msg.getIntProperty("msg"));
+ msg = consumer.receive(TIMEOUT);
+ assertEquals(11, msg.getIntProperty("msg"));
+
+ msg = consumer.receive(TIMEOUT);
+ assertEquals(3, msg.getIntProperty("msg"));
+ msg = consumer.receive(TIMEOUT);
+ assertEquals(6, msg.getIntProperty("msg"));
+ msg = consumer.receive(TIMEOUT);
+ assertEquals(8, msg.getIntProperty("msg"));
+ msg = consumer.receive(TIMEOUT);
+ assertEquals(10, msg.getIntProperty("msg"));
+ }
+
+ private Message nextMessage(int msg, boolean first, Session producerSession, MessageProducer producer) throws JMSException
+ {
+ Message send = producerSession.createTextMessage("Message: " + msg);
+ send.setIntProperty("msg", msg);
+
+ return send;
+ }
+
+
+}