summaryrefslogtreecommitdiff
path: root/qpid
diff options
context:
space:
mode:
authorBhupendra Bhusman Bhardwaj <bhupendrab@apache.org>2007-01-17 11:16:41 +0000
committerBhupendra Bhusman Bhardwaj <bhupendrab@apache.org>2007-01-17 11:16:41 +0000
commit783b2c2cacd1d1d8a2742223ad91c02e54448f1a (patch)
treea1b2c10b4daaffbadd27d87b03d1572860e51d58 /qpid
parent671dc16dbd0cd6e3a1677240136d127521bec0bf (diff)
downloadqpid-python-783b2c2cacd1d1d8a2742223ad91c02e54448f1a.tar.gz
added timer for callbackHandler to wait for next message before exiting
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@496991 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid')
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java149
1 files changed, 116 insertions, 33 deletions
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java b/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java
index 7d4b5eb24f..27bf8c74ea 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java
@@ -40,8 +40,11 @@ import java.net.UnknownHostException;
* A client that behaves as follows:
* <ul><li>Connects to a queue, whose name is specified as a cmd-line argument</li>
* <li>Creates a temporary queue</li>
- * <li>Creates messages containing a property that is the name of the temporary queue</li>
- * <li>Fires off a message on the original queue and waits for a response on the temporary queue</li>
+ * <li>Creates messages containing a property(reply-to) that is the name of the temporary queue</li>
+ * <li>Fires off a message on the original queue and registers the callbackHandler to listen to the response on the temporary queue</li>
+ * <li>Start the loop to send all messages</li>
+ * <li>CallbackHandler keeps listening to the responses and exits if all the messages have been received back or
+ * if the waiting time for next message is elapsed</li>
* </ul>
*/
public class ServiceRequestingClient implements ExceptionListener
@@ -49,6 +52,10 @@ public class ServiceRequestingClient implements ExceptionListener
private static final Logger _log = Logger.getLogger(ServiceRequestingClient.class);
private long _messageIdentifier = 0;
+
+ // time for which callbackHandler should wait for a message before exiting. Default time= 60 secs
+ private static long _callbackHandlerWaitingTime = 60000;
+
private String MESSAGE_DATA;
private AMQConnection _connection;
@@ -71,20 +78,23 @@ public class ServiceRequestingClient implements ExceptionListener
private class CallbackHandler implements MessageListener
{
- private int _expectedMessageCount;
-
private int _actualMessageCount;
private long _startTime;
+ // The time when the last message was received by the callbackHandler
+ private long _messageReceivedTime = 0;
+ private Object _timerCallbackHandler = new Object();
- public CallbackHandler(int expectedMessageCount, long startTime)
+ public CallbackHandler(long startTime)
{
- _expectedMessageCount = expectedMessageCount;
_startTime = startTime;
+ // Start the timer thread, which will keep checking if test should exit because the waiting time has elapsed
+ (new Thread(new TimerThread())).start();
}
public void onMessage(Message m)
{
+ _messageReceivedTime = System.currentTimeMillis();
if (_log.isDebugEnabled())
{
_log.debug("Message received: " + m);
@@ -95,16 +105,15 @@ public class ServiceRequestingClient implements ExceptionListener
if (m.propertyExists("timeSent"))
{
long timeSent = Long.parseLong(m.getStringProperty("timeSent"));
- long now = System.currentTimeMillis();
if (_averageLatency == 0)
{
- _averageLatency = now - timeSent;
+ _averageLatency = _messageReceivedTime - timeSent;
_log.info("Latency " + _averageLatency);
}
else
{
- _log.info("Individual latency: " + (now - timeSent));
- _averageLatency = (_averageLatency + (now - timeSent)) / 2;
+ _log.info("Individual latency: " + (_messageReceivedTime - timeSent));
+ _averageLatency = (_averageLatency + (_messageReceivedTime - timeSent)) / 2;
_log.info("Average latency now: " + _averageLatency);
}
}
@@ -124,27 +133,91 @@ public class ServiceRequestingClient implements ExceptionListener
}
checkForMessageID(m);
- if (_actualMessageCount == _expectedMessageCount)
+
+ if (_actualMessageCount == _messageCount)
{
- _completed = true;
- notifyWaiter();
- long timeTaken = System.currentTimeMillis() - _startTime;
- _log.info("Total time taken to receive " + _expectedMessageCount + " messages was " +
- timeTaken + "ms, equivalent to " +
- (_expectedMessageCount / (timeTaken / 1000.0)) + " messages per second");
-
- try
+ finishTesting(_actualMessageCount);
+ }
+ }
+
+ /**
+ * sets completed flag to true, closes the callbackHandler connection and notifies the waiter thread,
+ * so that the callbackHandler can finish listening for messages. This causes the test to finish.
+ * @param receivedMessageCount
+ */
+ private void finishTesting(int receivedMessageCount)
+ {
+ _completed = true;
+ notifyWaiter();
+ notifyTimerThread();
+
+ long timeTaken = System.currentTimeMillis() - _startTime;
+ _log.info("***** Result *****");
+ _log.info("Total messages received = " + receivedMessageCount);
+ _log.info("Total time taken to receive " + receivedMessageCount + " messages was " +
+ timeTaken + "ms, equivalent to " +
+ (receivedMessageCount / (timeTaken / 1000.0)) + " messages per second");
+
+ try
+ {
+ _connection.close();
+ _log.info("Connection closed");
+ }
+ catch (JMSException e)
+ {
+ _log.error("Error closing connection");
+ }
+ }
+
+ private void notifyTimerThread()
+ {
+ if (_timerCallbackHandler != null)
+ {
+ synchronized (_timerCallbackHandler)
{
- _connection.close();
- _log.info("Connection closed");
+ _timerCallbackHandler.notify();
}
- catch (JMSException e)
+ }
+ }
+
+ /**
+ * Thread class implementing the timer for callbackHandler. The thread will exit the test if the waiting time
+ * has elapsed before next message is received.
+ */
+ private class TimerThread implements Runnable
+ {
+ public void run()
+ {
+ do
{
- _log.error("Error closing connection");
+ try
+ {
+ synchronized(_timerCallbackHandler)
+ {
+ _timerCallbackHandler.wait(_callbackHandlerWaitingTime);
+ }
+ }
+ catch (InterruptedException ignore)
+ {
+
+ }
+
+ // exit if callbackHandler has received all messages
+ if (_completed)
+ {
+ _log.info("timer " + new java.util.Date());
+ return;
+ }
}
+ while ((System.currentTimeMillis() - _messageReceivedTime) < _callbackHandlerWaitingTime);
+
+ // waiting time has elapsed, so exit the test
+ _log.info("");
+ _log.info("Exited after waiting for " + _callbackHandlerWaitingTime/1000 + " secs");
+ finishTesting(_actualMessageCount);
}
}
- }
+ } // end of CallbackHandler class
/**
* Checks if the received AMQ Message ID(delivery tag) is in sequence, by comparing it with the AMQ MessageID
@@ -230,7 +303,7 @@ public class ServiceRequestingClient implements ExceptionListener
//now start the clock and the test...
final long startTime = System.currentTimeMillis();
- messageConsumer.setMessageListener(new CallbackHandler(messageCount, startTime));
+ messageConsumer.setMessageListener(new CallbackHandler(startTime));
}
catch (JMSException e)
{
@@ -284,10 +357,12 @@ public class ServiceRequestingClient implements ExceptionListener
*/
public static void main(String[] args)
{
- if (args.length < 6)
+ if ((args.length < 6) || (args.length == 8))
{
- System.err.println(
- "Usage: ServiceRequestingClient <brokerDetails> <username> <password> <vpath> <command queue name> <number of messages> [<message size>] [<P[ersistent]|N[onPersistent] (Default N)> <T[ransacted]|N[onTransacted] (Default N)>]");
+ System.err.println("Usage: ServiceRequestingClient <brokerDetails> <username> <password> <vpath> " +
+ "<command queue name> <number of messages> [<message size>] " +
+ "[<P[ersistent]|N[onPersistent] (Default N)> <T[ransacted]|N[onTransacted] (Default N)>] " +
+ "[<waiting time for response in sec (default 60 sec)>]");
System.exit(1);
}
try
@@ -296,18 +371,24 @@ public class ServiceRequestingClient implements ExceptionListener
boolean transactedMode = false;
int deliveryMode = DeliveryMode.NON_PERSISTENT;
+ if (args.length > 6)
+ {
+ messageSize = Integer.parseInt(args[6]);
+ }
if (args.length > 7)
{
- deliveryMode = args[args.length - 2].toUpperCase().charAt(0) == 'P' ? DeliveryMode.PERSISTENT
+ deliveryMode = args[7].toUpperCase().charAt(0) == 'P' ? DeliveryMode.PERSISTENT
: DeliveryMode.NON_PERSISTENT;
- transactedMode = args[args.length - 1].toUpperCase().charAt(0) == 'T' ? true : false;
+ transactedMode = args[8].toUpperCase().charAt(0) == 'T' ? true : false;
}
- if ((args.length == 9) ||(args.length == 7))
+ if (args.length > 9)
{
- messageSize = Integer.parseInt(args[6]);
- }
+ _callbackHandlerWaitingTime = Long.parseLong(args[9]) * 1000;
+ }
+
+ _log.info("Each message size = " + messageSize + " bytes");
InetAddress address = InetAddress.getLocalHost();
String clientID = address.getHostName() + System.currentTimeMillis();
@@ -316,6 +397,8 @@ public class ServiceRequestingClient implements ExceptionListener
messageSize);
Object waiter = new Object();
client.run(waiter);
+
+ // Start a thread to
synchronized (waiter)
{
while (!client.isCompleted())