diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2007-01-24 13:16:08 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2007-01-24 13:16:08 +0000 |
| commit | f0ca502e837a105ff43456ae23474f00d56760c6 (patch) | |
| tree | 94f553bbfcc6fad239e47adb901b1f1d24998540 /java/perftests/src/main | |
| parent | 75235567e7eb60055aea01ca6d1def4dfbe555e0 (diff) | |
| download | qpid-python-f0ca502e837a105ff43456ae23474f00d56760c6.tar.gz | |
Updated Async Test for destinations and for signalling completed runs when there is only 1 queue.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@499392 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/perftests/src/main')
4 files changed, 90 insertions, 71 deletions
diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java b/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java index 1891c9b556..debaa0d785 100644 --- a/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java +++ b/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java @@ -376,9 +376,7 @@ public abstract class AbstractPingProducer implements Runnable, ExceptionListene else
{
_producer.send(destination, message);
- }
-
- commitTx();
+ }
}
protected void doFailover(String broker)
diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java b/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java index 786aaa1e08..72d8010fe3 100644 --- a/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java +++ b/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java @@ -26,6 +26,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.requestreply.PingPongProducer; import org.apache.qpid.topic.Config; +import org.apache.qpid.util.concurrent.BooleanLatch; /** * This class is used to test sending and receiving messages to (pingQueue) and from a queue (replyQueue). @@ -71,7 +72,7 @@ public class TestPingItself extends PingPongProducer messageSize, verbose, afterCommit, beforeCommit, afterSend, beforeSend, failOnce, batchSize, noOfDestinations, rate, pubsub); - if (noOfDestinations > 1) + if (noOfDestinations > 0) { createDestinations(noOfDestinations); @@ -84,7 +85,7 @@ public class TestPingItself extends PingPongProducer } } - /** + /** * Sets the replyQueue to be the same as ping queue. */ @Override @@ -97,11 +98,6 @@ public class TestPingItself extends PingPongProducer consumer.setMessageListener(this); } - public void setMessageListener(MessageListener messageListener) throws JMSException - { - getConsumerSession().setMessageListener(messageListener); - } - /** * Starts a ping-pong loop running from the command line. * diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java index 3b4572d1b3..8a1583c609 100644 --- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java +++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java @@ -172,7 +172,7 @@ public class PingPongBouncer extends AbstractPingClient implements MessageListen }
// Extract all command line parameters.
- String brokerDetails = args[0];
+ String brokerDetails = args[0];
String username = args[1];
String password = args[2];
String virtualpath = args[3];
diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java index 2c27b48c88..2a8c7de5d7 100644 --- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java +++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java @@ -152,6 +152,8 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, */
int _throttleBatchSize;
+ private MessageListener _messageListener = null;
+
private PingPongProducer(String brokerDetails, String username, String password, String virtualpath, boolean transacted,
boolean persistent, int messageSize, boolean verbose, boolean afterCommit, boolean beforeCommit,
boolean afterSend, boolean beforeSend, boolean failOnce, int batchSize, int rate)
@@ -225,8 +227,8 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, _destinationCount = noOfDestinations;
setPubSub(pubsub);
-
- if (noOfDestinations <= 1)
+
+ if (noOfDestinations == 0)
{
if (destinationName != null)
{
@@ -237,8 +239,8 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, }
else
{
- _logger.error("Queue Name is not specified");
- throw new IllegalArgumentException("Queue Name is not specified");
+ _logger.error("Destination is not specified");
+ throw new IllegalArgumentException("Destination is not specified");
}
}
}
@@ -258,6 +260,7 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, /**
* Creates the producer to send the pings on. If the tests are with nultiple-destinations, then producer
* is created with null destination, so that any destination can be specified while sending
+ *
* @throws JMSException
*/
public void createProducer() throws JMSException
@@ -281,6 +284,7 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, /**
* Creates the temporary destination to listen to the responses
+ *
* @param selector
* @throws JMSException
*/
@@ -303,7 +307,7 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, /**
* Creates consumer instances for each destination. This is used when test is being done with multiple destinations.
- *
+ *
* @param selector
* @throws JMSException
*/
@@ -312,7 +316,7 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, for (int i = 0; i < getDestinationsCount(); i++)
{
MessageConsumer consumer =
- getConsumerSession().createConsumer(getDestination(i), PREFETCH, false, EXCLUSIVE, selector);
+ getConsumerSession().createConsumer(getDestination(i), PREFETCH, false, EXCLUSIVE, selector);
consumer.setMessageListener(this);
}
}
@@ -353,8 +357,8 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, if (args.length < 2)
{
System.err.println("Usage: TestPingPublisher <brokerDetails> <virtual path> [verbose (true/false)] " +
- "[transacted (true/false)] [persistent (true/false)] [message size in bytes] [batchsize]" +
- " [rate] [pubsub(true/false)]");
+ "[transacted (true/false)] [persistent (true/false)] [message size in bytes] [batchsize]" +
+ " [rate] [pubsub(true/false)]");
System.exit(0);
}
@@ -366,7 +370,7 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, int messageSize = (args.length >= 6) ? Integer.parseInt(args[5]) : DEFAULT_MESSAGE_SIZE;
int batchSize = (args.length >= 7) ? Integer.parseInt(args[6]) : 1;
int rate = (args.length >= 8) ? Integer.parseInt(args[7]) : 0;
- boolean ispubsub = (args.length >= 9) ? Boolean.parseBoolean(args[8]) : false;
+ boolean ispubsub = (args.length >= 9) ? Boolean.parseBoolean(args[8]) : false;
boolean afterCommit = false;
boolean beforeCommit = false;
@@ -408,9 +412,9 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, // Create a ping producer to handle the request/wait/reply cycle.
PingPongProducer pingProducer = new PingPongProducer(brokerDetails, "guest", "guest", virtualpath,
- PING_DESTINATION_NAME, null, transacted, persistent, messageSize, verbose,
- afterCommit, beforeCommit, afterSend, beforeSend, failOnce, batchSize,
- 0, rate, ispubsub);
+ PING_DESTINATION_NAME, null, transacted, persistent, messageSize, verbose,
+ afterCommit, beforeCommit, afterSend, beforeSend, failOnce, batchSize,
+ 0, rate, ispubsub);
pingProducer.getConnection().start();
@@ -421,7 +425,7 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, // Ensure that the ping pong producer is registered to listen for exceptions on the connection too.
pingProducer.getConnection().setExceptionListener(pingProducer);
-
+
// Create the ping loop thread and run it until it is terminated by the shutdown hook or exception.
Thread pingThread = new Thread(pingProducer);
pingThread.run();
@@ -444,15 +448,19 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, Message first = getTestMessage(_replyDestination, 0, false);
sendMessage(first);
+ commitTx();
+
try
{
Thread.sleep(100);
}
catch (InterruptedException ignore)
{
-
+
}
}
+
+
}
/**
@@ -482,6 +490,12 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, {
_logger.debug("Reply was expected, decrementing the latch for the id.");
trafficLight.countDown();
+
+ if (_messageListener != null)
+ {
+ _messageListener.onMessage(message);
+ }
+
}
else
{
@@ -519,14 +533,52 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, public int pingAndWaitForReply(Message message, int numPings, long timeout) throws JMSException, InterruptedException
{
// Put a unique correlation id on the message before sending it.
- String messageCorrelationId = Long.toString(idGenerator.incrementAndGet());
- message.setJMSCorrelationID(messageCorrelationId);
+ String messageCorrelationId = Long.toString(getNewID());
+
+
+ pingNoWaitForReply(message, numPings, messageCorrelationId);
+
+ CountDownLatch trafficLight = trafficLights.get(messageCorrelationId);
+ // Block the current thread until a reply to the message is received, or it times out.
+ trafficLight.await(timeout, TimeUnit.MILLISECONDS);
+
+ // Work out how many replies were receieved.
+ int numReplies = numPings - (int) trafficLight.getCount();
+
+ if ((numReplies < numPings) && _verbose)
+ {
+ _logger.info("Timed out (" + timeout + " ms) before all replies received on id, " + messageCorrelationId);
+ }
+ else if (_verbose)
+ {
+ _logger.info("Got all replies on id, " + messageCorrelationId);
+ }
+ return numReplies;
+ }
+
+ public long getNewID()
+ {
+ return idGenerator.incrementAndGet();
+ }
+
+ /*
+ * Sends the specified ping message but does not wait for a correlating reply.
+ *
+ * @param message The message to send.
+ * @param numPings The number of pings to send.
+ * @return The reply, or null if no reply arrives before the timeout.
+ * @throws JMSException All underlying JMSExceptions are allowed to fall through.
+ */
+ public void pingNoWaitForReply(Message message, int numPings, String messageCorrelationId) throws JMSException, InterruptedException
+ {
// Create a count down latch to count the number of replies with. This is created before the message is sent
// so that the message is not received before the count down is created.
CountDownLatch trafficLight = new CountDownLatch(numPings);
trafficLights.put(messageCorrelationId, trafficLight);
+ message.setJMSCorrelationID(messageCorrelationId);
+
// Set up a committed flag to detect uncommitted message at the end of the send loop. This may occurr if the
// transaction batch size is not a factor of the number of pings. In which case an extra commit at the end is
// needed.
@@ -579,43 +631,6 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, _logger.info(timestampFormatter.format(new Date()) + ": Pinged at with correlation id, " + messageCorrelationId);
}
- // Block the current thread until a reply to the message is received, or it times out.
- trafficLight.await(timeout, TimeUnit.MILLISECONDS);
-
- // Work out how many replies were receieved.
- int numReplies = numPings - (int) trafficLight.getCount();
-
- if ((numReplies < numPings) && _verbose)
- {
- _logger.info("Timed out (" + timeout + " ms) before all replies received on id, " + messageCorrelationId);
- }
- else if (_verbose)
- {
- _logger.info("Got all replies on id, " + messageCorrelationId);
- }
-
- return numReplies;
- }
-
- /*
- * Sends the specified ping message but does not wait for a correlating reply.
- *
- * @param message The message to send.
- * @param numPings The number of pings to send.
- * @return The reply, or null if no reply arrives before the timeout.
- * @throws JMSException All underlying JMSExceptions are allowed to fall through.
- */
- public void pingNoWaitForReply(Message message, int numPings) throws JMSException, InterruptedException
- {
- for (int i = 0; i < numPings; i++)
- {
- sendMessage(message);
-
- if (_verbose)
- {
- _logger.info(timestampFormatter.format(new Date()) + ": Pinged at.");
- }
- }
}
/**
@@ -658,14 +673,24 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, _replyDestination = destination;
}
+ public void setMessageListener(MessageListener messageListener)
+ {
+ _messageListener = messageListener;
+ }
+
+ public CountDownLatch getEndLock(String correlationID)
+ {
+ return trafficLights.get(correlationID);
+ }
+
/*
- * When the test is being performed with multiple queues, then this method will be used, which has a loop to
- * pick up the next queue from the queues list and sends message to it.
- *
- * @param message
- * @param numPings
- * @throws JMSException
- */
+ * When the test is being performed with multiple queues, then this method will be used, which has a loop to
+ * pick up the next queue from the queues list and sends message to it.
+ *
+ * @param message
+ * @param numPings
+ * @throws JMSException
+ */
/*private void pingMultipleQueues(Message message, int numPings) throws JMSException
{
int queueIndex = 0;
|
