diff options
Diffstat (limited to 'java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java')
-rw-r--r-- | java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java | 59 |
1 files changed, 43 insertions, 16 deletions
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java b/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java index 1b5e8276c2..f9d50e8e64 100644 --- a/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java +++ b/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java @@ -20,13 +20,16 @@ package org.apache.qpid.disttest.client; +import java.util.ArrayList; import java.util.Date; +import java.util.List; import java.util.NavigableSet; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; @@ -53,12 +56,17 @@ public class ConsumerParticipant implements Participant private long _startTime; private volatile Exception _asyncMessageListenerException; + private List<Long> _messageLatencies; public ConsumerParticipant(final ClientJmsDelegate delegate, final CreateConsumerCommand command) { _jmsDelegate = delegate; _command = command; _resultFactory = new ParticipantResultFactory(); + if (command.isEvaluateLatency()) + { + _messageLatencies = new ArrayList<Long>(); + } } @Override @@ -78,6 +86,8 @@ public class ConsumerParticipant implements Participant } else { + LOGGER.info("Consumer {} registering listener", getName()); + _jmsDelegate.registerListener(_command.getParticipantName(), new MessageListener(){ @Override @@ -105,14 +115,14 @@ public class ConsumerParticipant implements Participant numberOfMessagesSent, payloadSize, totalPayloadSize, - start, end); + start, end, _messageLatencies); return result; } private void synchronousRun() { - LOGGER.debug("entered synchronousRun: " + this); + LOGGER.info("Consumer {} about to consume messages", getName()); _startTime = System.currentTimeMillis(); @@ -130,25 +140,42 @@ public class ConsumerParticipant implements Participant */ private boolean processMessage(Message message) { - int messageCount = _totalNumberOfMessagesReceived.incrementAndGet(); - if (LOGGER.isTraceEnabled()) - { - LOGGER.trace("message " + messageCount + " received by " + this); - } - int messagePayloadSize = _jmsDelegate.calculatePayloadSizeFrom(message); - _allConsumedPayloadSizes.add(messagePayloadSize); - _totalPayloadSizeOfAllMessagesReceived.addAndGet(messagePayloadSize); - + int messageCount = message == null? _totalNumberOfMessagesReceived.get() : _totalNumberOfMessagesReceived.incrementAndGet() ; boolean batchEnabled = _command.getBatchSize() > 0; boolean batchComplete = batchEnabled && messageCount % _command.getBatchSize() == 0; - - if (!batchEnabled || batchComplete) + if (message != null) { - if (LOGGER.isTraceEnabled() && batchEnabled) + if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Committing: batch size " + _command.getBatchSize() ); + LOGGER.trace("message " + messageCount + " received by " + this); + } + int messagePayloadSize = _jmsDelegate.calculatePayloadSizeFrom(message); + _allConsumedPayloadSizes.add(messagePayloadSize); + _totalPayloadSizeOfAllMessagesReceived.addAndGet(messagePayloadSize); + + if (_command.isEvaluateLatency()) + { + long mesageTimestamp; + try + { + mesageTimestamp = message.getJMSTimestamp(); + } + catch (JMSException e) + { + throw new DistributedTestException("Cannot get message timestamp!", e); + } + long latency = System.currentTimeMillis() - mesageTimestamp; + _messageLatencies.add(latency); + } + + if (!batchEnabled || batchComplete) + { + if (LOGGER.isTraceEnabled() && batchEnabled) + { + LOGGER.trace("Committing: batch size " + _command.getBatchSize() ); + } + _jmsDelegate.commitOrAcknowledgeMessage(message, _command.getSessionName()); } - _jmsDelegate.commitOrAcknowledgeMessage(message, _command.getSessionName()); } boolean reachedExpectedNumberOfMessages = _command.getNumberOfMessages() > 0 && messageCount >= _command.getNumberOfMessages(); |