diff options
Diffstat (limited to 'java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java')
-rw-r--r-- | java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java | 168 |
1 files changed, 102 insertions, 66 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java index 0dbf95052f..ca38807fb1 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java @@ -29,6 +29,7 @@ import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.jndi.PropertiesFileInitialContextFactory; +import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.url.URLSyntaxException; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; @@ -42,106 +43,71 @@ import java.util.Hashtable; import java.util.HashMap; import java.util.Map; -/** Test Case provided by client Non-functional Test NF101: heap exhaustion behaviour */ -public class PriorityTest extends TestCase +public class PriorityTest extends QpidTestCase { - private static final Logger _logger = Logger.getLogger(PriorityTest.class); + private static final int TIMEOUT = 1500; - protected final String BROKER = "vm://:1"; - protected final String VHOST = "/test"; - protected final String QUEUE = "PriorityQueue"; + private static final Logger _logger = Logger.getLogger(PriorityTest.class); + protected final String QUEUE = "PriorityQueue"; private static final int MSG_COUNT = 50; + private Connection producerConnection; + private MessageProducer producer; + private Session producerSession; + private Queue queue; + private Connection consumerConnection; + private Session consumerSession; + + + private MessageConsumer consumer; + protected void setUp() throws Exception { super.setUp(); - if (usingInVMBroker()) - { - TransportConnection.createVMBroker(1); - } - - - } + producerConnection = getConnection(); + producerSession = producerConnection.createSession(true, Session.AUTO_ACKNOWLEDGE); - private boolean usingInVMBroker() - { - return BROKER.startsWith("vm://"); + producerConnection.start(); + + consumerConnection = getConnection(); + consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } protected void tearDown() throws Exception { - if (usingInVMBroker()) - { - TransportConnection.killAllVMBrokers(); - } + producerConnection.close(); + consumerConnection.close(); super.tearDown(); } public void testPriority() throws JMSException, NamingException, AMQException { - InitialContextFactory factory = new PropertiesFileInitialContextFactory(); - - Hashtable<String, String> env = new Hashtable<String, String>(); - - env.put("connectionfactory.connection", "amqp://guest:guest@PRIORITY_TEST_ID" + VHOST + "?brokerlist='" + BROKER + "'"); - env.put("queue.queue", QUEUE); - - Context context = factory.getInitialContext(env); - - Connection producerConnection = ((ConnectionFactory) context.lookup("connection")).createConnection(); - - Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - final Map<String,Object> arguments = new HashMap<String, Object>(); arguments.put("x-qpid-priorities",10); - ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments); - - Queue queue = new AMQQueue("amq.direct",QUEUE); + queue = (Queue) producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'"); ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); - - - - - - - producerConnection.start(); - - - MessageProducer producer = producerSession.createProducer(queue); - - - - + producer = producerSession.createProducer(queue); for (int msg = 0; msg < MSG_COUNT; msg++) { producer.setPriority(msg % 10); producer.send(nextMessage(msg, false, producerSession, producer)); } - + producerSession.commit(); producer.close(); producerSession.close(); producerConnection.close(); - - Connection consumerConnection = ((ConnectionFactory) context.lookup("connection")).createConnection(); - Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = consumerSession.createConsumer(queue); - - - - + consumer = consumerSession.createConsumer(queue); consumerConnection.start(); - Message received; - //Receive Message 0 - StringBuilder buf = new StringBuilder(); int receivedCount = 0; Message previous = null; int messageCount = 0; @@ -158,10 +124,80 @@ public class PriorityTest extends TestCase } assertEquals("Incorrect number of message received", 50, receivedCount); - - producerSession.close(); - producer.close(); - + } + + public void testOddOrdering() throws AMQException, JMSException + { + final Map<String,Object> arguments = new HashMap<String, Object>(); + arguments.put("x-qpid-priorities",3); + ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments); + queue = producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'"); + + ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); + producer = producerSession.createProducer(queue); + + // In order ABC + producer.setPriority(9); + producer.send(nextMessage(1, false, producerSession, producer)); + producer.setPriority(4); + producer.send(nextMessage(2, false, producerSession, producer)); + producer.setPriority(1); + producer.send(nextMessage(3, false, producerSession, producer)); + + // Out of order BAC + producer.setPriority(4); + producer.send(nextMessage(4, false, producerSession, producer)); + producer.setPriority(9); + producer.send(nextMessage(5, false, producerSession, producer)); + producer.setPriority(1); + producer.send(nextMessage(6, false, producerSession, producer)); + + // Out of order BCA + producer.setPriority(4); + producer.send(nextMessage(7, false, producerSession, producer)); + producer.setPriority(1); + producer.send(nextMessage(8, false, producerSession, producer)); + producer.setPriority(9); + producer.send(nextMessage(9, false, producerSession, producer)); + + // Reverse order CBA + producer.setPriority(1); + producer.send(nextMessage(10, false, producerSession, producer)); + producer.setPriority(4); + producer.send(nextMessage(11, false, producerSession, producer)); + producer.setPriority(9); + producer.send(nextMessage(12, false, producerSession, producer)); + producerSession.commit(); + + consumer = consumerSession.createConsumer(queue); + consumerConnection.start(); + + Message msg = consumer.receive(TIMEOUT); + assertEquals(1, msg.getIntProperty("msg")); + msg = consumer.receive(TIMEOUT); + assertEquals(5, msg.getIntProperty("msg")); + msg = consumer.receive(TIMEOUT); + assertEquals(9, msg.getIntProperty("msg")); + msg = consumer.receive(TIMEOUT); + assertEquals(12, msg.getIntProperty("msg")); + + msg = consumer.receive(TIMEOUT); + assertEquals(2, msg.getIntProperty("msg")); + msg = consumer.receive(TIMEOUT); + assertEquals(4, msg.getIntProperty("msg")); + msg = consumer.receive(TIMEOUT); + assertEquals(7, msg.getIntProperty("msg")); + msg = consumer.receive(TIMEOUT); + assertEquals(11, msg.getIntProperty("msg")); + + msg = consumer.receive(TIMEOUT); + assertEquals(3, msg.getIntProperty("msg")); + msg = consumer.receive(TIMEOUT); + assertEquals(6, msg.getIntProperty("msg")); + msg = consumer.receive(TIMEOUT); + assertEquals(8, msg.getIntProperty("msg")); + msg = consumer.receive(TIMEOUT); + assertEquals(10, msg.getIntProperty("msg")); } private Message nextMessage(int msg, boolean first, Session producerSession, MessageProducer producer) throws JMSException |