diff options
Diffstat (limited to 'java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java')
-rw-r--r-- | java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java | 171 |
1 files changed, 171 insertions, 0 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java new file mode 100644 index 0000000000..3d53b87b45 --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java @@ -0,0 +1,171 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.queue; + +import junit.framework.TestCase; +import junit.framework.Assert; +import org.apache.log4j.Logger; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.jndi.PropertiesFileInitialContextFactory; +import org.apache.qpid.url.URLSyntaxException; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; + +import javax.jms.*; +import javax.naming.NamingException; +import javax.naming.Context; +import javax.naming.spi.InitialContextFactory; +import java.util.Hashtable; + +/** Test Case provided by client Non-functional Test NF101: heap exhaustion behaviour */ +public class PriorityTest extends TestCase +{ + private static final Logger _logger = Logger.getLogger(PriorityTest.class); + + + protected final String BROKER = "vm://:1"; + protected final String VHOST = "/test"; + protected final String QUEUE = "PriorityQueue"; + + + private static final int MSG_COUNT = 50; + + protected void setUp() throws Exception + { + super.setUp(); + + if (usingInVMBroker()) + { + TransportConnection.createVMBroker(1); + } + + + } + + private boolean usingInVMBroker() + { + return BROKER.startsWith("vm://"); + } + + protected void tearDown() throws Exception + { + if (usingInVMBroker()) + { + TransportConnection.killAllVMBrokers(); + } + super.tearDown(); + } + + public void testPriority() throws JMSException, NamingException, AMQException + { + InitialContextFactory factory = new PropertiesFileInitialContextFactory(); + + Hashtable<String, String> env = new Hashtable<String, String>(); + + env.put("connectionfactory.connection", "amqp://guest:guest@TTL_TEST_ID" + VHOST + "?brokerlist='" + BROKER + "'"); + env.put("queue.queue", QUEUE); + + Context context = factory.getInitialContext(env); + + Connection producerConnection = ((ConnectionFactory) context.lookup("connection")).createConnection(); + + Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + final FieldTable arguments = new FieldTable(); + arguments.put(new AMQShortString("x-qpid-priorities"),10); + + ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments); + + Queue queue = (Queue) context.lookup("queue"); + + ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); + + + + + + + producerConnection.start(); + + + MessageProducer producer = producerSession.createProducer(queue); + + + + + + for (int msg = 0; msg < MSG_COUNT; msg++) + { + producer.setPriority(msg % 10); + producer.send(nextMessage(msg, false, producerSession, producer)); + } + + producer.close(); + producerSession.close(); + producerConnection.close(); + + + Connection consumerConnection = ((ConnectionFactory) context.lookup("connection")).createConnection(); + Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = consumerSession.createConsumer(queue); + + + + + consumerConnection.start(); + + Message received; + //Receive Message 0 + StringBuilder buf = new StringBuilder(); + int receivedCount = 0; + Message previous = null; + while((received = consumer.receive(1000))!=null) + { + if(previous != null) + { + assertTrue("Messages arrived in unexpected order", (previous.getJMSPriority() > received.getJMSPriority()) || ((previous.getJMSPriority() == received.getJMSPriority()) && previous.getIntProperty("msg") < received.getIntProperty("msg")) ); + } + + previous = received; + receivedCount++; + } + + assertEquals("Incorrect number of message received", 50, receivedCount); + + producerSession.close(); + producer.close(); + + } + + private Message nextMessage(int msg, boolean first, Session producerSession, MessageProducer producer) throws JMSException + { + Message send = producerSession.createTextMessage("Message: " + msg); + send.setIntProperty("msg", msg); + + return send; + } + + +} |