summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2011-03-25 13:00:53 +0000
committerRobert Gemmell <robbie@apache.org>2011-03-25 13:00:53 +0000
commit19f781317e57fc3679ebba1e26176f2d7976e877 (patch)
tree033d7f3edfad1036475dd74776c3f226433472ce
parent2376cb9ffc404fe10ac68d357fb7219175302012 (diff)
downloadqpid-python-19f781317e57fc3679ebba1e26176f2d7976e877.tar.gz
QPID-3166: add system test using multiple batch transacted producers with multiple consumers using unique selectors. Exposes issue detailed in QPID-3165.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.5.x-dev@1085353 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MultipleTransactedBatchProducerTest.java247
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java5
2 files changed, 250 insertions, 2 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MultipleTransactedBatchProducerTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MultipleTransactedBatchProducerTest.java
new file mode 100644
index 0000000000..cfa03931c6
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MultipleTransactedBatchProducerTest.java
@@ -0,0 +1,247 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+/**
+ * MultipleTransactedBatchProducerTest
+ *
+ * Summary:
+ * When there are multiple producers submitting batches of messages to a given
+ * queue using transacted sessions, it is highly probable that concurrent
+ * enqueue() activity will occur and attempt delivery of their message to the
+ * same subscription. In this scenario it is likely that one of the attempts
+ * will succeed and the other will result in use of the deliverAsync() method
+ * to start a queue Runner and ensure delivery of the message.
+ *
+ * A defect within the processQueue() method used by the Runner would mean that
+ * delivery of these messages may not occur, should the Runner stop before all
+ * messages have been processed. Such a defect was discovered and found to be
+ * most visible when Selectors are used such that one and only one subscription
+ * can/will accept any given message, but multiple subscriptions are present,
+ * and one of the earlier subscriptions receives more messages than the others.
+ *
+ * This test is to validate that the processQueue() method is able to correctly
+ * deliver all of the messages present for asynchronous delivery to subscriptions,
+ * by utilising multiple batch transacted producers to create the scenario and
+ * ensure all messages are received by a consumer.
+ */
+public class MultipleTransactedBatchProducerTest extends QpidTestCase
+{
+ private static final Logger _logger = Logger.getLogger(MultipleTransactedBatchProducerTest.class);
+
+ private static final int MESSAGE_COUNT = 1000;
+ private static final int BATCH_SIZE = 50;
+ private static final int NUM_PRODUCERS = 2;
+ private static final int NUM_CONSUMERS = 3;
+ private static final Random RANDOM = new Random();
+
+ private CountDownLatch _receivedLatch;
+ private String _queueName;
+
+ private String _failMsg;
+
+ public void setUp() throws Exception
+ {
+ //debug level logging often makes this test pass artificially, turn the level down to info.
+ setSystemProperty("amqj.server.logging.level", "INFO");
+ _receivedLatch = new CountDownLatch(MESSAGE_COUNT * NUM_PRODUCERS);
+ setConfigurationProperty("management.enabled", "true");
+ super.setUp();
+ _queueName = getTestQueueName();
+ _failMsg = null;
+ }
+
+ public void testMultipleBatchedProducersWithMultipleConsumersUsingSelectors() throws Exception
+ {
+ String selector1 = ("(\"" + _queueName +"\" % " + NUM_CONSUMERS + ") = 0");
+ String selector2 = ("(\"" + _queueName +"\" % " + NUM_CONSUMERS + ") = 1");
+ String selector3 = ("(\"" + _queueName +"\" % " + NUM_CONSUMERS + ") = 2");
+
+ //create consumers
+ Connection conn1 = getConnection();
+ conn1.setExceptionListener(new ExceptionHandler("conn1"));
+ Session sess1 = conn1.createSession(true, Session.SESSION_TRANSACTED);
+ MessageConsumer cons1 = sess1.createConsumer(sess1.createQueue(_queueName), selector1);
+ cons1.setMessageListener(new Cons(sess1,"consumer1"));
+
+ Connection conn2 = getConnection();
+ conn2.setExceptionListener(new ExceptionHandler("conn2"));
+ Session sess2 = conn2.createSession(true, Session.SESSION_TRANSACTED);
+ MessageConsumer cons2 = sess2.createConsumer(sess2.createQueue(_queueName), selector2);
+ cons2.setMessageListener(new Cons(sess2,"consumer2"));
+
+ Connection conn3 = getConnection();
+ conn3.setExceptionListener(new ExceptionHandler("conn3"));
+ Session sess3 = conn3.createSession(true, Session.SESSION_TRANSACTED);
+ MessageConsumer cons3 = sess3.createConsumer(sess3.createQueue(_queueName), selector3);
+ cons3.setMessageListener(new Cons(sess3,"consumer3"));
+
+ conn1.start();
+ conn2.start();
+ conn3.start();
+
+ //create producers
+ Connection connA = getConnection();
+ connA.setExceptionListener(new ExceptionHandler("connA"));
+ Connection connB = getConnection();
+ connB.setExceptionListener(new ExceptionHandler("connB"));
+ Thread producer1 = new Thread(new ProducerThread(connA, _queueName, "producer1"));
+ Thread producer2 = new Thread(new ProducerThread(connB, _queueName, "producer2"));
+
+ producer1.start();
+ Thread.sleep(10);
+ producer2.start();
+
+ //await delivery of the messages
+ boolean result = _receivedLatch.await(75, TimeUnit.SECONDS);
+
+ assertNull("Test failed because: " + String.valueOf(_failMsg), _failMsg);
+ assertTrue("Some of the messages were not all recieved in the given timeframe, remaining count was: "+_receivedLatch.getCount(),
+ result);
+
+ }
+
+ @Override
+ public Message createNextMessage(Session session, int msgCount) throws JMSException
+ {
+ Message message = super.createNextMessage(session,msgCount);
+
+ //bias at least 50% of the messages to the first consumers selector
+ int val;
+ if (msgCount % 2 == 0)
+ {
+ val = 0;
+ }
+ else
+ {
+ val = RANDOM.nextInt(Integer.MAX_VALUE);
+ }
+
+ message.setIntProperty(_queueName, val);
+
+ return message;
+ }
+
+ private class Cons implements MessageListener
+ {
+ private Session _sess;
+ private String _desc;
+
+ public Cons(Session sess, String desc)
+ {
+ _sess = sess;
+ _desc = desc;
+ }
+
+ public void onMessage(Message message)
+ {
+ _receivedLatch.countDown();
+ int msgCount = 0;
+ int msgID = 0;
+ try
+ {
+ msgCount = message.getIntProperty(INDEX);
+ msgID = message.getIntProperty(_queueName);
+ }
+ catch (JMSException e)
+ {
+ _logger.error(_desc + " received exception: " + e.getMessage(), e);
+ failAsyncTest(e.getMessage());
+ }
+
+ _logger.info("Consumer received message:"+ msgCount + " with ID: " + msgID);
+
+ try
+ {
+ _sess.commit();
+ }
+ catch (JMSException e)
+ {
+ _logger.error(_desc + " received exception: " + e.getMessage(), e);
+ failAsyncTest(e.getMessage());
+ }
+ }
+ }
+
+ private class ProducerThread implements Runnable
+ {
+ private Connection _conn;
+ private String _dest;
+ private String _desc;
+
+ public ProducerThread(Connection conn, String dest, String desc)
+ {
+ _conn = conn;
+ _dest = dest;
+ _desc = desc;
+ }
+
+ public void run()
+ {
+ try
+ {
+ Session session = _conn.createSession(true, Session.SESSION_TRANSACTED);
+ sendMessage(session, session.createQueue(_dest), MESSAGE_COUNT, BATCH_SIZE);
+ }
+ catch (Exception e)
+ {
+ _logger.error(_desc + " received exception: " + e.getMessage(), e);
+ failAsyncTest(e.getMessage());
+ }
+ }
+ }
+
+ private class ExceptionHandler implements javax.jms.ExceptionListener
+ {
+ private String _desc;
+
+ public ExceptionHandler(String description)
+ {
+ _desc = description;
+ }
+
+ public void onException(JMSException e)
+ {
+ _logger.error(_desc + " received exception: " + e.getMessage(), e);
+ failAsyncTest(e.getMessage());
+ }
+ }
+
+ private void failAsyncTest(String msg)
+ {
+ _logger.error("Failing test because: " + msg);
+ _failMsg = msg;
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java
index 390d718c9d..4d3492790f 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java
@@ -1209,7 +1209,8 @@ public class QpidTestCase extends TestCase
MessageProducer producer = session.createProducer(destination);
- for (int i = offset; i < (count + offset); i++)
+ int i = offset;
+ for (; i < (count + offset); i++)
{
Message next = createNextMessage(session, i);
@@ -1232,7 +1233,7 @@ public class QpidTestCase extends TestCase
// we have no batchSize or
// our count is not divible by batchSize.
if (session.getTransacted() &&
- ( batchSize == 0 || count % batchSize != 0))
+ ( batchSize == 0 || (i-1) % batchSize != 0))
{
session.commit();
}