diff options
author | Bhupendra Bhusman Bhardwaj <bhupendrab@apache.org> | 2007-01-17 11:16:41 +0000 |
---|---|---|
committer | Bhupendra Bhusman Bhardwaj <bhupendrab@apache.org> | 2007-01-17 11:16:41 +0000 |
commit | 783b2c2cacd1d1d8a2742223ad91c02e54448f1a (patch) | |
tree | a1b2c10b4daaffbadd27d87b03d1572860e51d58 /qpid | |
parent | 671dc16dbd0cd6e3a1677240136d127521bec0bf (diff) | |
download | qpid-python-783b2c2cacd1d1d8a2742223ad91c02e54448f1a.tar.gz |
added timer for callbackHandler to wait for next message before exiting
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@496991 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid')
-rw-r--r-- | qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java | 149 |
1 files changed, 116 insertions, 33 deletions
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java b/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java index 7d4b5eb24f..27bf8c74ea 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java @@ -40,8 +40,11 @@ import java.net.UnknownHostException; * A client that behaves as follows: * <ul><li>Connects to a queue, whose name is specified as a cmd-line argument</li> * <li>Creates a temporary queue</li> - * <li>Creates messages containing a property that is the name of the temporary queue</li> - * <li>Fires off a message on the original queue and waits for a response on the temporary queue</li> + * <li>Creates messages containing a property(reply-to) that is the name of the temporary queue</li> + * <li>Fires off a message on the original queue and registers the callbackHandler to listen to the response on the temporary queue</li> + * <li>Start the loop to send all messages</li> + * <li>CallbackHandler keeps listening to the responses and exits if all the messages have been received back or + * if the waiting time for next message is elapsed</li> * </ul> */ public class ServiceRequestingClient implements ExceptionListener @@ -49,6 +52,10 @@ public class ServiceRequestingClient implements ExceptionListener private static final Logger _log = Logger.getLogger(ServiceRequestingClient.class); private long _messageIdentifier = 0; + + // time for which callbackHandler should wait for a message before exiting. Default time= 60 secs + private static long _callbackHandlerWaitingTime = 60000; + private String MESSAGE_DATA; private AMQConnection _connection; @@ -71,20 +78,23 @@ public class ServiceRequestingClient implements ExceptionListener private class CallbackHandler implements MessageListener { - private int _expectedMessageCount; - private int _actualMessageCount; private long _startTime; + // The time when the last message was received by the callbackHandler + private long _messageReceivedTime = 0; + private Object _timerCallbackHandler = new Object(); - public CallbackHandler(int expectedMessageCount, long startTime) + public CallbackHandler(long startTime) { - _expectedMessageCount = expectedMessageCount; _startTime = startTime; + // Start the timer thread, which will keep checking if test should exit because the waiting time has elapsed + (new Thread(new TimerThread())).start(); } public void onMessage(Message m) { + _messageReceivedTime = System.currentTimeMillis(); if (_log.isDebugEnabled()) { _log.debug("Message received: " + m); @@ -95,16 +105,15 @@ public class ServiceRequestingClient implements ExceptionListener if (m.propertyExists("timeSent")) { long timeSent = Long.parseLong(m.getStringProperty("timeSent")); - long now = System.currentTimeMillis(); if (_averageLatency == 0) { - _averageLatency = now - timeSent; + _averageLatency = _messageReceivedTime - timeSent; _log.info("Latency " + _averageLatency); } else { - _log.info("Individual latency: " + (now - timeSent)); - _averageLatency = (_averageLatency + (now - timeSent)) / 2; + _log.info("Individual latency: " + (_messageReceivedTime - timeSent)); + _averageLatency = (_averageLatency + (_messageReceivedTime - timeSent)) / 2; _log.info("Average latency now: " + _averageLatency); } } @@ -124,27 +133,91 @@ public class ServiceRequestingClient implements ExceptionListener } checkForMessageID(m); - if (_actualMessageCount == _expectedMessageCount) + + if (_actualMessageCount == _messageCount) { - _completed = true; - notifyWaiter(); - long timeTaken = System.currentTimeMillis() - _startTime; - _log.info("Total time taken to receive " + _expectedMessageCount + " messages was " + - timeTaken + "ms, equivalent to " + - (_expectedMessageCount / (timeTaken / 1000.0)) + " messages per second"); - - try + finishTesting(_actualMessageCount); + } + } + + /** + * sets completed flag to true, closes the callbackHandler connection and notifies the waiter thread, + * so that the callbackHandler can finish listening for messages. This causes the test to finish. + * @param receivedMessageCount + */ + private void finishTesting(int receivedMessageCount) + { + _completed = true; + notifyWaiter(); + notifyTimerThread(); + + long timeTaken = System.currentTimeMillis() - _startTime; + _log.info("***** Result *****"); + _log.info("Total messages received = " + receivedMessageCount); + _log.info("Total time taken to receive " + receivedMessageCount + " messages was " + + timeTaken + "ms, equivalent to " + + (receivedMessageCount / (timeTaken / 1000.0)) + " messages per second"); + + try + { + _connection.close(); + _log.info("Connection closed"); + } + catch (JMSException e) + { + _log.error("Error closing connection"); + } + } + + private void notifyTimerThread() + { + if (_timerCallbackHandler != null) + { + synchronized (_timerCallbackHandler) { - _connection.close(); - _log.info("Connection closed"); + _timerCallbackHandler.notify(); } - catch (JMSException e) + } + } + + /** + * Thread class implementing the timer for callbackHandler. The thread will exit the test if the waiting time + * has elapsed before next message is received. + */ + private class TimerThread implements Runnable + { + public void run() + { + do { - _log.error("Error closing connection"); + try + { + synchronized(_timerCallbackHandler) + { + _timerCallbackHandler.wait(_callbackHandlerWaitingTime); + } + } + catch (InterruptedException ignore) + { + + } + + // exit if callbackHandler has received all messages + if (_completed) + { + _log.info("timer " + new java.util.Date()); + return; + } } + while ((System.currentTimeMillis() - _messageReceivedTime) < _callbackHandlerWaitingTime); + + // waiting time has elapsed, so exit the test + _log.info(""); + _log.info("Exited after waiting for " + _callbackHandlerWaitingTime/1000 + " secs"); + finishTesting(_actualMessageCount); } } - } + } // end of CallbackHandler class /** * Checks if the received AMQ Message ID(delivery tag) is in sequence, by comparing it with the AMQ MessageID @@ -230,7 +303,7 @@ public class ServiceRequestingClient implements ExceptionListener //now start the clock and the test... final long startTime = System.currentTimeMillis(); - messageConsumer.setMessageListener(new CallbackHandler(messageCount, startTime)); + messageConsumer.setMessageListener(new CallbackHandler(startTime)); } catch (JMSException e) { @@ -284,10 +357,12 @@ public class ServiceRequestingClient implements ExceptionListener */ public static void main(String[] args) { - if (args.length < 6) + if ((args.length < 6) || (args.length == 8)) { - System.err.println( - "Usage: ServiceRequestingClient <brokerDetails> <username> <password> <vpath> <command queue name> <number of messages> [<message size>] [<P[ersistent]|N[onPersistent] (Default N)> <T[ransacted]|N[onTransacted] (Default N)>]"); + System.err.println("Usage: ServiceRequestingClient <brokerDetails> <username> <password> <vpath> " + + "<command queue name> <number of messages> [<message size>] " + + "[<P[ersistent]|N[onPersistent] (Default N)> <T[ransacted]|N[onTransacted] (Default N)>] " + + "[<waiting time for response in sec (default 60 sec)>]"); System.exit(1); } try @@ -296,18 +371,24 @@ public class ServiceRequestingClient implements ExceptionListener boolean transactedMode = false; int deliveryMode = DeliveryMode.NON_PERSISTENT; + if (args.length > 6) + { + messageSize = Integer.parseInt(args[6]); + } if (args.length > 7) { - deliveryMode = args[args.length - 2].toUpperCase().charAt(0) == 'P' ? DeliveryMode.PERSISTENT + deliveryMode = args[7].toUpperCase().charAt(0) == 'P' ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT; - transactedMode = args[args.length - 1].toUpperCase().charAt(0) == 'T' ? true : false; + transactedMode = args[8].toUpperCase().charAt(0) == 'T' ? true : false; } - if ((args.length == 9) ||(args.length == 7)) + if (args.length > 9) { - messageSize = Integer.parseInt(args[6]); - } + _callbackHandlerWaitingTime = Long.parseLong(args[9]) * 1000; + } + + _log.info("Each message size = " + messageSize + " bytes"); InetAddress address = InetAddress.getLocalHost(); String clientID = address.getHostName() + System.currentTimeMillis(); @@ -316,6 +397,8 @@ public class ServiceRequestingClient implements ExceptionListener messageSize); Object waiter = new Object(); client.run(waiter); + + // Start a thread to synchronized (waiter) { while (!client.isCompleted()) |