summaryrefslogtreecommitdiff
path: root/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java')
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java59
1 files changed, 43 insertions, 16 deletions
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java b/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java
index 1b5e8276c2..f9d50e8e64 100644
--- a/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java
@@ -20,13 +20,16 @@
package org.apache.qpid.disttest.client;
+import java.util.ArrayList;
import java.util.Date;
+import java.util.List;
import java.util.NavigableSet;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
@@ -53,12 +56,17 @@ public class ConsumerParticipant implements Participant
private long _startTime;
private volatile Exception _asyncMessageListenerException;
+ private List<Long> _messageLatencies;
public ConsumerParticipant(final ClientJmsDelegate delegate, final CreateConsumerCommand command)
{
_jmsDelegate = delegate;
_command = command;
_resultFactory = new ParticipantResultFactory();
+ if (command.isEvaluateLatency())
+ {
+ _messageLatencies = new ArrayList<Long>();
+ }
}
@Override
@@ -78,6 +86,8 @@ public class ConsumerParticipant implements Participant
}
else
{
+ LOGGER.info("Consumer {} registering listener", getName());
+
_jmsDelegate.registerListener(_command.getParticipantName(), new MessageListener(){
@Override
@@ -105,14 +115,14 @@ public class ConsumerParticipant implements Participant
numberOfMessagesSent,
payloadSize,
totalPayloadSize,
- start, end);
+ start, end, _messageLatencies);
return result;
}
private void synchronousRun()
{
- LOGGER.debug("entered synchronousRun: " + this);
+ LOGGER.info("Consumer {} about to consume messages", getName());
_startTime = System.currentTimeMillis();
@@ -130,25 +140,42 @@ public class ConsumerParticipant implements Participant
*/
private boolean processMessage(Message message)
{
- int messageCount = _totalNumberOfMessagesReceived.incrementAndGet();
- if (LOGGER.isTraceEnabled())
- {
- LOGGER.trace("message " + messageCount + " received by " + this);
- }
- int messagePayloadSize = _jmsDelegate.calculatePayloadSizeFrom(message);
- _allConsumedPayloadSizes.add(messagePayloadSize);
- _totalPayloadSizeOfAllMessagesReceived.addAndGet(messagePayloadSize);
-
+ int messageCount = message == null? _totalNumberOfMessagesReceived.get() : _totalNumberOfMessagesReceived.incrementAndGet() ;
boolean batchEnabled = _command.getBatchSize() > 0;
boolean batchComplete = batchEnabled && messageCount % _command.getBatchSize() == 0;
-
- if (!batchEnabled || batchComplete)
+ if (message != null)
{
- if (LOGGER.isTraceEnabled() && batchEnabled)
+ if (LOGGER.isTraceEnabled())
{
- LOGGER.trace("Committing: batch size " + _command.getBatchSize() );
+ LOGGER.trace("message " + messageCount + " received by " + this);
+ }
+ int messagePayloadSize = _jmsDelegate.calculatePayloadSizeFrom(message);
+ _allConsumedPayloadSizes.add(messagePayloadSize);
+ _totalPayloadSizeOfAllMessagesReceived.addAndGet(messagePayloadSize);
+
+ if (_command.isEvaluateLatency())
+ {
+ long mesageTimestamp;
+ try
+ {
+ mesageTimestamp = message.getJMSTimestamp();
+ }
+ catch (JMSException e)
+ {
+ throw new DistributedTestException("Cannot get message timestamp!", e);
+ }
+ long latency = System.currentTimeMillis() - mesageTimestamp;
+ _messageLatencies.add(latency);
+ }
+
+ if (!batchEnabled || batchComplete)
+ {
+ if (LOGGER.isTraceEnabled() && batchEnabled)
+ {
+ LOGGER.trace("Committing: batch size " + _command.getBatchSize() );
+ }
+ _jmsDelegate.commitOrAcknowledgeMessage(message, _command.getSessionName());
}
- _jmsDelegate.commitOrAcknowledgeMessage(message, _command.getSessionName());
}
boolean reachedExpectedNumberOfMessages = _command.getNumberOfMessages() > 0 && messageCount >= _command.getNumberOfMessages();