summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2012-09-19 12:53:30 +0000
committerKeith Wall <kwall@apache.org>2012-09-19 12:53:30 +0000
commit16551bcc5e6adda658cea2bc8b9e956adf1b1d8e (patch)
treef385a3014bdd0876606bcdb4363723b8f61a071b
parentc90dd9d1f079e897ffc6d7056357440308c51dcb (diff)
downloadqpid-python-16551bcc5e6adda658cea2bc8b9e956adf1b1d8e.tar.gz
QPID-4321: Perf tests should not try to call Message#acknowledge on a producing session
* ProducerParticipant makes erroneous call to Message#acknowledge * Externalise the poll timeout used by QpidQueueCreator to drain the queue after test * Topic-AckModes.js - replace tests for client-ack, dups-okay-ack with session transacted git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1387565 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/perftests/etc/testdefs/Topic-AckModes.js7
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java4
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/client/ProducerParticipant.java4
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/jms/ClientJmsDelegate.java54
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java33
-rw-r--r--java/perftests/src/test/java/org/apache/qpid/disttest/client/ConsumerParticipantTest.java10
-rw-r--r--java/perftests/src/test/java/org/apache/qpid/disttest/client/ProducerParticipantTest.java10
7 files changed, 61 insertions, 61 deletions
diff --git a/java/perftests/etc/testdefs/Topic-AckModes.js b/java/perftests/etc/testdefs/Topic-AckModes.js
index 63c4b8646e..31da633695 100644
--- a/java/perftests/etc/testdefs/Topic-AckModes.js
+++ b/java/perftests/etc/testdefs/Topic-AckModes.js
@@ -27,13 +27,10 @@ var jsonObject = {
"_name": "Topic ack modes",
"_iterations": [
{
- "_acknowledgeMode": 1
- },
- {
- "_acknowledgeMode": 2
+ "_acknowledgeMode": 0
},
{
- "_acknowledgeMode": 3
+ "_acknowledgeMode": 1
}
],
"_clients": [
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java b/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java
index f9d50e8e64..8c69e5694b 100644
--- a/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java
@@ -174,7 +174,7 @@ public class ConsumerParticipant implements Participant
{
LOGGER.trace("Committing: batch size " + _command.getBatchSize() );
}
- _jmsDelegate.commitOrAcknowledgeMessage(message, _command.getSessionName());
+ _jmsDelegate.commitOrAcknowledgeMessageIfNecessary(_command.getSessionName(), message);
}
}
@@ -199,7 +199,7 @@ public class ConsumerParticipant implements Participant
}
// commit/acknowledge remaining messages if necessary
- _jmsDelegate.commitOrAcknowledgeMessage(message, _command.getSessionName());
+ _jmsDelegate.commitOrAcknowledgeMessageIfNecessary(_command.getSessionName(), message);
}
return false;
}
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/client/ProducerParticipant.java b/java/perftests/src/main/java/org/apache/qpid/disttest/client/ProducerParticipant.java
index 63cbe98b5c..567deea6f4 100644
--- a/java/perftests/src/main/java/org/apache/qpid/disttest/client/ProducerParticipant.java
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/client/ProducerParticipant.java
@@ -119,7 +119,7 @@ public class ProducerParticipant implements Participant
{
LOGGER.trace("Committing: batch size " + _command.getBatchSize() );
}
- _jmsDelegate.commitOrAcknowledgeMessage(lastPublishedMessage, _command.getSessionName());
+ _jmsDelegate.commitIfNecessary(_command.getSessionName());
doSleepForInterval();
}
@@ -138,7 +138,7 @@ public class ProducerParticipant implements Participant
{
LOGGER.trace("Committing: batch size " + _command.getBatchSize() );
}
- _jmsDelegate.commitOrAcknowledgeMessage(lastPublishedMessage, _command.getSessionName());
+ _jmsDelegate.commitIfNecessary(_command.getSessionName());
}
Date start = new Date(startTime);
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ClientJmsDelegate.java b/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ClientJmsDelegate.java
index 3f8afc9a9a..a177770a30 100644
--- a/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ClientJmsDelegate.java
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ClientJmsDelegate.java
@@ -373,30 +373,6 @@ public class ClientJmsDelegate
}
}
- public void commitOrAcknowledgeMessage(final Message message, final String sessionName)
- {
- try
- {
- final Session session = _testSessions.get(sessionName);
- if (session.getTransacted())
- {
- synchronized(session)
- {
- session.commit();
- }
- }
- else if (message != null && session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
- {
- message.acknowledge();
- }
- }
- catch (final JMSException jmse)
- {
- throw new DistributedTestException("Unable to commit or acknowledge message on session: " +
- sessionName, jmse);
- }
- }
-
public int getAcknowledgeMode(final String sessionName)
{
try
@@ -493,31 +469,36 @@ public class ClientJmsDelegate
}
}
- public void rollbackOrRecover(String sessionName)
+ public void commitOrAcknowledgeMessageIfNecessary(final String sessionName, final Message message)
{
try
{
final Session session = _testSessions.get(sessionName);
- synchronized(session)
+ if (session.getTransacted())
{
- if (session.getTransacted())
- {
- session.rollback();
- }
- else if (session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
+ synchronized(session)
{
- session.recover();
+ session.commit();
}
}
+ else if (message != null && session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
+ {
+ message.acknowledge();
+ }
}
catch (final JMSException jmse)
{
- throw new DistributedTestException("Unable to rollback or recover on session: " +
+ throw new DistributedTestException("Unable to commit or acknowledge message on session: " +
sessionName, jmse);
}
}
- public void releaseMessage(String sessionName)
+ public void commitIfNecessary(final String sessionName)
+ {
+ commitOrAcknowledgeMessageIfNecessary(sessionName, null);
+ }
+
+ public void rollbackOrRecoverIfNecessary(String sessionName)
{
try
{
@@ -528,7 +509,7 @@ public class ClientJmsDelegate
{
session.rollback();
}
- else
+ else if (session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
{
session.recover();
}
@@ -536,7 +517,8 @@ public class ClientJmsDelegate
}
catch (final JMSException jmse)
{
- LOGGER.warn("Unable to rollback or recover on session: " + sessionName, jmse);
+ throw new DistributedTestException("Unable to rollback or recover on session: " +
+ sessionName, jmse);
}
}
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java b/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java
index 4ce8efeae2..0b906d228f 100644
--- a/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java
@@ -24,6 +24,8 @@ import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
+
+import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.disttest.DistributedTestException;
@@ -36,6 +38,8 @@ public class QpidQueueCreator implements QueueCreator
{
private static final Logger LOGGER = LoggerFactory.getLogger(QpidQueueCreator.class);
private static final FieldTable EMPTY_QUEUE_BIND_ARGUMENTS = new FieldTable();
+ private static final String QUEUE_CREATOR_DRAIN_POLL_TIMEOUT = "qpid.disttest.queue.creator.drainPollTime";
+ private static int _drainPollTimeout = Integer.getInteger(QUEUE_CREATOR_DRAIN_POLL_TIMEOUT, 5000);
@Override
public void createQueues(Connection connection, Session session, List<QueueConfig> configs)
@@ -57,7 +61,10 @@ public class QpidQueueCreator implements QueueCreator
// drainQueue method is added because deletion of queue with a lot
// of messages takes time and might cause the timeout exception
- drainQueue(connection, destination);
+ if (queueHasMessages(amqSession, destination))
+ {
+ drainQueue(connection, destination);
+ }
deleteQueue(amqSession, destination.getAMQQueueName());
}
}
@@ -74,20 +81,34 @@ public class QpidQueueCreator implements QueueCreator
}
}
+ private boolean queueHasMessages(AMQSession<?, ?> amqSession, AMQDestination destination)
+ {
+ try
+ {
+ long queueDepth = amqSession.getQueueDepth(destination);
+ LOGGER.info("Queue {} has {} message(s)", destination.getQueueName(), queueDepth);
+ return queueDepth > 0;
+ }
+ catch (Exception e)
+ {
+ throw new DistributedTestException("Failed to query queue depth:" + destination, e);
+ }
+ }
+
private void drainQueue(Connection connection, AMQDestination destination)
{
Session noAckSession = null;
try
{
- LOGGER.debug("About to drain the queue " + destination);
+ LOGGER.debug("About to drain the queue {}", destination.getQueueName());
noAckSession = connection.createSession(false, org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
MessageConsumer messageConsumer = noAckSession.createConsumer(destination);
int counter = 0;
- while(messageConsumer.receive(1000l) != null)
+ while(messageConsumer.receive(_drainPollTimeout) != null)
{
counter++;
}
- LOGGER.debug("Drained " + counter + " messages from queue " + destination);
+ LOGGER.info("Drained {} message(s) from queue {} ", counter, destination.getQueueName());
messageConsumer.close();
}
catch (Exception e)
@@ -123,7 +144,7 @@ public class QpidQueueCreator implements QueueCreator
EMPTY_QUEUE_BIND_ARGUMENTS, destination.getExchangeName(),
destination, autoDelete);
- LOGGER.debug("Created queue " + queueConfig);
+ LOGGER.debug("Created queue {}", queueConfig);
}
catch (Exception e)
{
@@ -139,7 +160,7 @@ public class QpidQueueCreator implements QueueCreator
// raw protocol method public. This should be changed then we should switch the below to
// use #deleteQueue.
session.sendQueueDelete(queueName);
- LOGGER.debug("Deleted queue " + queueName);
+ LOGGER.debug("Deleted queue {}", queueName);
}
catch (Exception e)
{
diff --git a/java/perftests/src/test/java/org/apache/qpid/disttest/client/ConsumerParticipantTest.java b/java/perftests/src/test/java/org/apache/qpid/disttest/client/ConsumerParticipantTest.java
index 58589d36f4..ec5f1ec0e0 100644
--- a/java/perftests/src/test/java/org/apache/qpid/disttest/client/ConsumerParticipantTest.java
+++ b/java/perftests/src/test/java/org/apache/qpid/disttest/client/ConsumerParticipantTest.java
@@ -114,7 +114,7 @@ public class ConsumerParticipantTest extends TestCase
_inOrder.verify(_delegate).consumeMessage(PARTICIPANT_NAME1, RECEIVE_TIMEOUT);
_inOrder.verify(_delegate).calculatePayloadSizeFrom(_mockMessage);
- _inOrder.verify(_delegate).commitOrAcknowledgeMessage(_mockMessage, SESSION_NAME1);
+ _inOrder.verify(_delegate).commitOrAcknowledgeMessageIfNecessary(SESSION_NAME1, _mockMessage);
}
public void testReceiveMessagesForDurationSynch() throws Exception
@@ -129,7 +129,7 @@ public class ConsumerParticipantTest extends TestCase
verify(_delegate, atLeastOnce()).consumeMessage(PARTICIPANT_NAME1, RECEIVE_TIMEOUT);
verify(_delegate, atLeastOnce()).calculatePayloadSizeFrom(_mockMessage);
- verify(_delegate, atLeastOnce()).commitOrAcknowledgeMessage(_mockMessage, SESSION_NAME1);
+ verify(_delegate, atLeastOnce()).commitOrAcknowledgeMessageIfNecessary(SESSION_NAME1, _mockMessage);
}
public void testReceiveMessagesBatchedSynch() throws Exception
@@ -147,7 +147,7 @@ public class ConsumerParticipantTest extends TestCase
verify(_delegate, times(numberOfMessages)).consumeMessage(PARTICIPANT_NAME1, RECEIVE_TIMEOUT);
verify(_delegate, times(numberOfMessages)).calculatePayloadSizeFrom(_mockMessage);
- verify(_delegate, times(4)).commitOrAcknowledgeMessage(_mockMessage, SESSION_NAME1);
+ verify(_delegate, times(4)).commitOrAcknowledgeMessageIfNecessary(SESSION_NAME1, _mockMessage);
}
public void testReceiveMessagesWithVaryingPayloadSize() throws Exception
@@ -171,7 +171,7 @@ public class ConsumerParticipantTest extends TestCase
verify(_delegate, times(numberOfMessages)).consumeMessage(PARTICIPANT_NAME1, RECEIVE_TIMEOUT);
verify(_delegate, times(numberOfMessages)).calculatePayloadSizeFrom(_mockMessage);
- verify(_delegate, times(numberOfMessages)).commitOrAcknowledgeMessage(_mockMessage, SESSION_NAME1);
+ verify(_delegate, times(numberOfMessages)).commitOrAcknowledgeMessageIfNecessary(SESSION_NAME1, _mockMessage);
}
public void testReleaseResources()
@@ -194,7 +194,7 @@ public class ConsumerParticipantTest extends TestCase
_inOrder.verify(_delegate).consumeMessage(PARTICIPANT_NAME1, RECEIVE_TIMEOUT);
_inOrder.verify(_delegate).calculatePayloadSizeFrom(_mockMessage);
- _inOrder.verify(_delegate).commitOrAcknowledgeMessage(_mockMessage, SESSION_NAME1);
+ _inOrder.verify(_delegate).commitOrAcknowledgeMessageIfNecessary(SESSION_NAME1, _mockMessage);
assertTrue("Unexpected consuemr results", result instanceof ConsumerParticipantResult);
Collection<Long> latencies = ((ConsumerParticipantResult)result).getMessageLatencies();
assertNotNull("Message latency is not cllected", latencies);
diff --git a/java/perftests/src/test/java/org/apache/qpid/disttest/client/ProducerParticipantTest.java b/java/perftests/src/test/java/org/apache/qpid/disttest/client/ProducerParticipantTest.java
index a3ac11b756..ce36fdb9ad 100644
--- a/java/perftests/src/test/java/org/apache/qpid/disttest/client/ProducerParticipantTest.java
+++ b/java/perftests/src/test/java/org/apache/qpid/disttest/client/ProducerParticipantTest.java
@@ -127,7 +127,7 @@ public class ProducerParticipantTest extends TestCase
_inOrder.verify(_delegate).sendNextMessage(isA(CreateProducerCommand.class));
_inOrder.verify(_delegate).calculatePayloadSizeFrom(_mockMessage);
- _inOrder.verify(_delegate).commitOrAcknowledgeMessage(_mockMessage, SESSION_NAME1);
+ _inOrder.verify(_delegate).commitIfNecessary(SESSION_NAME1);
}
@@ -142,7 +142,7 @@ public class ProducerParticipantTest extends TestCase
verify(_delegate, atLeastOnce()).sendNextMessage(isA(CreateProducerCommand.class));
verify(_delegate, atLeastOnce()).calculatePayloadSizeFrom(_mockMessage);
- verify(_delegate, atLeastOnce()).commitOrAcknowledgeMessage(_mockMessage, SESSION_NAME1);
+ verify(_delegate, atLeastOnce()).commitIfNecessary(SESSION_NAME1);
}
public void testSendMessageBatches() throws Exception
@@ -161,7 +161,7 @@ public class ProducerParticipantTest extends TestCase
verify(_delegate, times(numberOfMessages)).sendNextMessage(isA(CreateProducerCommand.class));
verify(_delegate, times(numberOfMessages)).calculatePayloadSizeFrom(_mockMessage);
- verify(_delegate, times(expectedNumberOfCommits)).commitOrAcknowledgeMessage(_mockMessage, SESSION_NAME1);
+ verify(_delegate, times(expectedNumberOfCommits)).commitIfNecessary(SESSION_NAME1);
}
public void testSendMessageWithPublishInterval() throws Exception
@@ -183,7 +183,7 @@ public class ProducerParticipantTest extends TestCase
verify(_delegate, times(numberOfMessages)).sendNextMessage(isA(CreateProducerCommand.class));
verify(_delegate, times(numberOfMessages)).calculatePayloadSizeFrom(_mockMessage);
- verify(_delegate, times(4)).commitOrAcknowledgeMessage(_mockMessage, SESSION_NAME1);
+ verify(_delegate, times(4)).commitIfNecessary(SESSION_NAME1);
}
public void testSendMessageWithVaryingPayloadSize() throws Exception
@@ -208,7 +208,7 @@ public class ProducerParticipantTest extends TestCase
verify(_delegate, times(numberOfMessages)).sendNextMessage(isA(CreateProducerCommand.class));
verify(_delegate, times(numberOfMessages)).calculatePayloadSizeFrom(_mockMessage);
- verify(_delegate, times(numberOfMessages)).commitOrAcknowledgeMessage(_mockMessage, SESSION_NAME1);
+ verify(_delegate, times(numberOfMessages)).commitIfNecessary(SESSION_NAME1);
}
public void testReleaseResources()