diff options
author | Keith Wall <kwall@apache.org> | 2012-09-19 12:53:30 +0000 |
---|---|---|
committer | Keith Wall <kwall@apache.org> | 2012-09-19 12:53:30 +0000 |
commit | 16551bcc5e6adda658cea2bc8b9e956adf1b1d8e (patch) | |
tree | f385a3014bdd0876606bcdb4363723b8f61a071b | |
parent | c90dd9d1f079e897ffc6d7056357440308c51dcb (diff) | |
download | qpid-python-16551bcc5e6adda658cea2bc8b9e956adf1b1d8e.tar.gz |
QPID-4321: Perf tests should not try to call Message#acknowledge on a producing session
* ProducerParticipant makes erroneous call to Message#acknowledge
* Externalise the poll timeout used by QpidQueueCreator to drain the queue after test
* Topic-AckModes.js - replace tests for client-ack, dups-okay-ack with session transacted
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1387565 13f79535-47bb-0310-9956-ffa450edef68
7 files changed, 61 insertions, 61 deletions
diff --git a/java/perftests/etc/testdefs/Topic-AckModes.js b/java/perftests/etc/testdefs/Topic-AckModes.js index 63c4b8646e..31da633695 100644 --- a/java/perftests/etc/testdefs/Topic-AckModes.js +++ b/java/perftests/etc/testdefs/Topic-AckModes.js @@ -27,13 +27,10 @@ var jsonObject = { "_name": "Topic ack modes", "_iterations": [ { - "_acknowledgeMode": 1 - }, - { - "_acknowledgeMode": 2 + "_acknowledgeMode": 0 }, { - "_acknowledgeMode": 3 + "_acknowledgeMode": 1 } ], "_clients": [ diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java b/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java index f9d50e8e64..8c69e5694b 100644 --- a/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java +++ b/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java @@ -174,7 +174,7 @@ public class ConsumerParticipant implements Participant { LOGGER.trace("Committing: batch size " + _command.getBatchSize() ); } - _jmsDelegate.commitOrAcknowledgeMessage(message, _command.getSessionName()); + _jmsDelegate.commitOrAcknowledgeMessageIfNecessary(_command.getSessionName(), message); } } @@ -199,7 +199,7 @@ public class ConsumerParticipant implements Participant } // commit/acknowledge remaining messages if necessary - _jmsDelegate.commitOrAcknowledgeMessage(message, _command.getSessionName()); + _jmsDelegate.commitOrAcknowledgeMessageIfNecessary(_command.getSessionName(), message); } return false; } diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/client/ProducerParticipant.java b/java/perftests/src/main/java/org/apache/qpid/disttest/client/ProducerParticipant.java index 63cbe98b5c..567deea6f4 100644 --- a/java/perftests/src/main/java/org/apache/qpid/disttest/client/ProducerParticipant.java +++ b/java/perftests/src/main/java/org/apache/qpid/disttest/client/ProducerParticipant.java @@ -119,7 +119,7 @@ public class ProducerParticipant implements Participant { LOGGER.trace("Committing: batch size " + _command.getBatchSize() ); } - _jmsDelegate.commitOrAcknowledgeMessage(lastPublishedMessage, _command.getSessionName()); + _jmsDelegate.commitIfNecessary(_command.getSessionName()); doSleepForInterval(); } @@ -138,7 +138,7 @@ public class ProducerParticipant implements Participant { LOGGER.trace("Committing: batch size " + _command.getBatchSize() ); } - _jmsDelegate.commitOrAcknowledgeMessage(lastPublishedMessage, _command.getSessionName()); + _jmsDelegate.commitIfNecessary(_command.getSessionName()); } Date start = new Date(startTime); diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ClientJmsDelegate.java b/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ClientJmsDelegate.java index 3f8afc9a9a..a177770a30 100644 --- a/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ClientJmsDelegate.java +++ b/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ClientJmsDelegate.java @@ -373,30 +373,6 @@ public class ClientJmsDelegate } } - public void commitOrAcknowledgeMessage(final Message message, final String sessionName) - { - try - { - final Session session = _testSessions.get(sessionName); - if (session.getTransacted()) - { - synchronized(session) - { - session.commit(); - } - } - else if (message != null && session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) - { - message.acknowledge(); - } - } - catch (final JMSException jmse) - { - throw new DistributedTestException("Unable to commit or acknowledge message on session: " + - sessionName, jmse); - } - } - public int getAcknowledgeMode(final String sessionName) { try @@ -493,31 +469,36 @@ public class ClientJmsDelegate } } - public void rollbackOrRecover(String sessionName) + public void commitOrAcknowledgeMessageIfNecessary(final String sessionName, final Message message) { try { final Session session = _testSessions.get(sessionName); - synchronized(session) + if (session.getTransacted()) { - if (session.getTransacted()) - { - session.rollback(); - } - else if (session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) + synchronized(session) { - session.recover(); + session.commit(); } } + else if (message != null && session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) + { + message.acknowledge(); + } } catch (final JMSException jmse) { - throw new DistributedTestException("Unable to rollback or recover on session: " + + throw new DistributedTestException("Unable to commit or acknowledge message on session: " + sessionName, jmse); } } - public void releaseMessage(String sessionName) + public void commitIfNecessary(final String sessionName) + { + commitOrAcknowledgeMessageIfNecessary(sessionName, null); + } + + public void rollbackOrRecoverIfNecessary(String sessionName) { try { @@ -528,7 +509,7 @@ public class ClientJmsDelegate { session.rollback(); } - else + else if (session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) { session.recover(); } @@ -536,7 +517,8 @@ public class ClientJmsDelegate } catch (final JMSException jmse) { - LOGGER.warn("Unable to rollback or recover on session: " + sessionName, jmse); + throw new DistributedTestException("Unable to rollback or recover on session: " + + sessionName, jmse); } } diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java b/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java index 4ce8efeae2..0b906d228f 100644 --- a/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java +++ b/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java @@ -24,6 +24,8 @@ import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; + +import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQSession; import org.apache.qpid.disttest.DistributedTestException; @@ -36,6 +38,8 @@ public class QpidQueueCreator implements QueueCreator { private static final Logger LOGGER = LoggerFactory.getLogger(QpidQueueCreator.class); private static final FieldTable EMPTY_QUEUE_BIND_ARGUMENTS = new FieldTable(); + private static final String QUEUE_CREATOR_DRAIN_POLL_TIMEOUT = "qpid.disttest.queue.creator.drainPollTime"; + private static int _drainPollTimeout = Integer.getInteger(QUEUE_CREATOR_DRAIN_POLL_TIMEOUT, 5000); @Override public void createQueues(Connection connection, Session session, List<QueueConfig> configs) @@ -57,7 +61,10 @@ public class QpidQueueCreator implements QueueCreator // drainQueue method is added because deletion of queue with a lot // of messages takes time and might cause the timeout exception - drainQueue(connection, destination); + if (queueHasMessages(amqSession, destination)) + { + drainQueue(connection, destination); + } deleteQueue(amqSession, destination.getAMQQueueName()); } } @@ -74,20 +81,34 @@ public class QpidQueueCreator implements QueueCreator } } + private boolean queueHasMessages(AMQSession<?, ?> amqSession, AMQDestination destination) + { + try + { + long queueDepth = amqSession.getQueueDepth(destination); + LOGGER.info("Queue {} has {} message(s)", destination.getQueueName(), queueDepth); + return queueDepth > 0; + } + catch (Exception e) + { + throw new DistributedTestException("Failed to query queue depth:" + destination, e); + } + } + private void drainQueue(Connection connection, AMQDestination destination) { Session noAckSession = null; try { - LOGGER.debug("About to drain the queue " + destination); + LOGGER.debug("About to drain the queue {}", destination.getQueueName()); noAckSession = connection.createSession(false, org.apache.qpid.jms.Session.NO_ACKNOWLEDGE); MessageConsumer messageConsumer = noAckSession.createConsumer(destination); int counter = 0; - while(messageConsumer.receive(1000l) != null) + while(messageConsumer.receive(_drainPollTimeout) != null) { counter++; } - LOGGER.debug("Drained " + counter + " messages from queue " + destination); + LOGGER.info("Drained {} message(s) from queue {} ", counter, destination.getQueueName()); messageConsumer.close(); } catch (Exception e) @@ -123,7 +144,7 @@ public class QpidQueueCreator implements QueueCreator EMPTY_QUEUE_BIND_ARGUMENTS, destination.getExchangeName(), destination, autoDelete); - LOGGER.debug("Created queue " + queueConfig); + LOGGER.debug("Created queue {}", queueConfig); } catch (Exception e) { @@ -139,7 +160,7 @@ public class QpidQueueCreator implements QueueCreator // raw protocol method public. This should be changed then we should switch the below to // use #deleteQueue. session.sendQueueDelete(queueName); - LOGGER.debug("Deleted queue " + queueName); + LOGGER.debug("Deleted queue {}", queueName); } catch (Exception e) { diff --git a/java/perftests/src/test/java/org/apache/qpid/disttest/client/ConsumerParticipantTest.java b/java/perftests/src/test/java/org/apache/qpid/disttest/client/ConsumerParticipantTest.java index 58589d36f4..ec5f1ec0e0 100644 --- a/java/perftests/src/test/java/org/apache/qpid/disttest/client/ConsumerParticipantTest.java +++ b/java/perftests/src/test/java/org/apache/qpid/disttest/client/ConsumerParticipantTest.java @@ -114,7 +114,7 @@ public class ConsumerParticipantTest extends TestCase _inOrder.verify(_delegate).consumeMessage(PARTICIPANT_NAME1, RECEIVE_TIMEOUT); _inOrder.verify(_delegate).calculatePayloadSizeFrom(_mockMessage); - _inOrder.verify(_delegate).commitOrAcknowledgeMessage(_mockMessage, SESSION_NAME1); + _inOrder.verify(_delegate).commitOrAcknowledgeMessageIfNecessary(SESSION_NAME1, _mockMessage); } public void testReceiveMessagesForDurationSynch() throws Exception @@ -129,7 +129,7 @@ public class ConsumerParticipantTest extends TestCase verify(_delegate, atLeastOnce()).consumeMessage(PARTICIPANT_NAME1, RECEIVE_TIMEOUT); verify(_delegate, atLeastOnce()).calculatePayloadSizeFrom(_mockMessage); - verify(_delegate, atLeastOnce()).commitOrAcknowledgeMessage(_mockMessage, SESSION_NAME1); + verify(_delegate, atLeastOnce()).commitOrAcknowledgeMessageIfNecessary(SESSION_NAME1, _mockMessage); } public void testReceiveMessagesBatchedSynch() throws Exception @@ -147,7 +147,7 @@ public class ConsumerParticipantTest extends TestCase verify(_delegate, times(numberOfMessages)).consumeMessage(PARTICIPANT_NAME1, RECEIVE_TIMEOUT); verify(_delegate, times(numberOfMessages)).calculatePayloadSizeFrom(_mockMessage); - verify(_delegate, times(4)).commitOrAcknowledgeMessage(_mockMessage, SESSION_NAME1); + verify(_delegate, times(4)).commitOrAcknowledgeMessageIfNecessary(SESSION_NAME1, _mockMessage); } public void testReceiveMessagesWithVaryingPayloadSize() throws Exception @@ -171,7 +171,7 @@ public class ConsumerParticipantTest extends TestCase verify(_delegate, times(numberOfMessages)).consumeMessage(PARTICIPANT_NAME1, RECEIVE_TIMEOUT); verify(_delegate, times(numberOfMessages)).calculatePayloadSizeFrom(_mockMessage); - verify(_delegate, times(numberOfMessages)).commitOrAcknowledgeMessage(_mockMessage, SESSION_NAME1); + verify(_delegate, times(numberOfMessages)).commitOrAcknowledgeMessageIfNecessary(SESSION_NAME1, _mockMessage); } public void testReleaseResources() @@ -194,7 +194,7 @@ public class ConsumerParticipantTest extends TestCase _inOrder.verify(_delegate).consumeMessage(PARTICIPANT_NAME1, RECEIVE_TIMEOUT); _inOrder.verify(_delegate).calculatePayloadSizeFrom(_mockMessage); - _inOrder.verify(_delegate).commitOrAcknowledgeMessage(_mockMessage, SESSION_NAME1); + _inOrder.verify(_delegate).commitOrAcknowledgeMessageIfNecessary(SESSION_NAME1, _mockMessage); assertTrue("Unexpected consuemr results", result instanceof ConsumerParticipantResult); Collection<Long> latencies = ((ConsumerParticipantResult)result).getMessageLatencies(); assertNotNull("Message latency is not cllected", latencies); diff --git a/java/perftests/src/test/java/org/apache/qpid/disttest/client/ProducerParticipantTest.java b/java/perftests/src/test/java/org/apache/qpid/disttest/client/ProducerParticipantTest.java index a3ac11b756..ce36fdb9ad 100644 --- a/java/perftests/src/test/java/org/apache/qpid/disttest/client/ProducerParticipantTest.java +++ b/java/perftests/src/test/java/org/apache/qpid/disttest/client/ProducerParticipantTest.java @@ -127,7 +127,7 @@ public class ProducerParticipantTest extends TestCase _inOrder.verify(_delegate).sendNextMessage(isA(CreateProducerCommand.class)); _inOrder.verify(_delegate).calculatePayloadSizeFrom(_mockMessage); - _inOrder.verify(_delegate).commitOrAcknowledgeMessage(_mockMessage, SESSION_NAME1); + _inOrder.verify(_delegate).commitIfNecessary(SESSION_NAME1); } @@ -142,7 +142,7 @@ public class ProducerParticipantTest extends TestCase verify(_delegate, atLeastOnce()).sendNextMessage(isA(CreateProducerCommand.class)); verify(_delegate, atLeastOnce()).calculatePayloadSizeFrom(_mockMessage); - verify(_delegate, atLeastOnce()).commitOrAcknowledgeMessage(_mockMessage, SESSION_NAME1); + verify(_delegate, atLeastOnce()).commitIfNecessary(SESSION_NAME1); } public void testSendMessageBatches() throws Exception @@ -161,7 +161,7 @@ public class ProducerParticipantTest extends TestCase verify(_delegate, times(numberOfMessages)).sendNextMessage(isA(CreateProducerCommand.class)); verify(_delegate, times(numberOfMessages)).calculatePayloadSizeFrom(_mockMessage); - verify(_delegate, times(expectedNumberOfCommits)).commitOrAcknowledgeMessage(_mockMessage, SESSION_NAME1); + verify(_delegate, times(expectedNumberOfCommits)).commitIfNecessary(SESSION_NAME1); } public void testSendMessageWithPublishInterval() throws Exception @@ -183,7 +183,7 @@ public class ProducerParticipantTest extends TestCase verify(_delegate, times(numberOfMessages)).sendNextMessage(isA(CreateProducerCommand.class)); verify(_delegate, times(numberOfMessages)).calculatePayloadSizeFrom(_mockMessage); - verify(_delegate, times(4)).commitOrAcknowledgeMessage(_mockMessage, SESSION_NAME1); + verify(_delegate, times(4)).commitIfNecessary(SESSION_NAME1); } public void testSendMessageWithVaryingPayloadSize() throws Exception @@ -208,7 +208,7 @@ public class ProducerParticipantTest extends TestCase verify(_delegate, times(numberOfMessages)).sendNextMessage(isA(CreateProducerCommand.class)); verify(_delegate, times(numberOfMessages)).calculatePayloadSizeFrom(_mockMessage); - verify(_delegate, times(numberOfMessages)).commitOrAcknowledgeMessage(_mockMessage, SESSION_NAME1); + verify(_delegate, times(numberOfMessages)).commitIfNecessary(SESSION_NAME1); } public void testReleaseResources() |