summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java4
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java10
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java2
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java145
-rw-r--r--java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java54
-rw-r--r--java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java10
6 files changed, 123 insertions, 102 deletions
diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java b/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java
index 1891c9b556..debaa0d785 100644
--- a/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java
+++ b/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java
@@ -376,9 +376,7 @@ public abstract class AbstractPingProducer implements Runnable, ExceptionListene
else
{
_producer.send(destination, message);
- }
-
- commitTx();
+ }
}
protected void doFailover(String broker)
diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java b/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java
index 786aaa1e08..72d8010fe3 100644
--- a/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java
+++ b/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java
@@ -26,6 +26,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.requestreply.PingPongProducer;
import org.apache.qpid.topic.Config;
+import org.apache.qpid.util.concurrent.BooleanLatch;
/**
* This class is used to test sending and receiving messages to (pingQueue) and from a queue (replyQueue).
@@ -71,7 +72,7 @@ public class TestPingItself extends PingPongProducer
messageSize, verbose, afterCommit, beforeCommit, afterSend, beforeSend, failOnce, batchSize,
noOfDestinations, rate, pubsub);
- if (noOfDestinations > 1)
+ if (noOfDestinations > 0)
{
createDestinations(noOfDestinations);
@@ -84,7 +85,7 @@ public class TestPingItself extends PingPongProducer
}
}
- /**
+ /**
* Sets the replyQueue to be the same as ping queue.
*/
@Override
@@ -97,11 +98,6 @@ public class TestPingItself extends PingPongProducer
consumer.setMessageListener(this);
}
- public void setMessageListener(MessageListener messageListener) throws JMSException
- {
- getConsumerSession().setMessageListener(messageListener);
- }
-
/**
* Starts a ping-pong loop running from the command line.
*
diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java
index 3b4572d1b3..8a1583c609 100644
--- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java
+++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java
@@ -172,7 +172,7 @@ public class PingPongBouncer extends AbstractPingClient implements MessageListen
}
// Extract all command line parameters.
- String brokerDetails = args[0];
+ String brokerDetails = args[0];
String username = args[1];
String password = args[2];
String virtualpath = args[3];
diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
index 2c27b48c88..2a8c7de5d7 100644
--- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
+++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
@@ -152,6 +152,8 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
*/
int _throttleBatchSize;
+ private MessageListener _messageListener = null;
+
private PingPongProducer(String brokerDetails, String username, String password, String virtualpath, boolean transacted,
boolean persistent, int messageSize, boolean verbose, boolean afterCommit, boolean beforeCommit,
boolean afterSend, boolean beforeSend, boolean failOnce, int batchSize, int rate)
@@ -225,8 +227,8 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
_destinationCount = noOfDestinations;
setPubSub(pubsub);
-
- if (noOfDestinations <= 1)
+
+ if (noOfDestinations == 0)
{
if (destinationName != null)
{
@@ -237,8 +239,8 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
}
else
{
- _logger.error("Queue Name is not specified");
- throw new IllegalArgumentException("Queue Name is not specified");
+ _logger.error("Destination is not specified");
+ throw new IllegalArgumentException("Destination is not specified");
}
}
}
@@ -258,6 +260,7 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
/**
* Creates the producer to send the pings on. If the tests are with nultiple-destinations, then producer
* is created with null destination, so that any destination can be specified while sending
+ *
* @throws JMSException
*/
public void createProducer() throws JMSException
@@ -281,6 +284,7 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
/**
* Creates the temporary destination to listen to the responses
+ *
* @param selector
* @throws JMSException
*/
@@ -303,7 +307,7 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
/**
* Creates consumer instances for each destination. This is used when test is being done with multiple destinations.
- *
+ *
* @param selector
* @throws JMSException
*/
@@ -312,7 +316,7 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
for (int i = 0; i < getDestinationsCount(); i++)
{
MessageConsumer consumer =
- getConsumerSession().createConsumer(getDestination(i), PREFETCH, false, EXCLUSIVE, selector);
+ getConsumerSession().createConsumer(getDestination(i), PREFETCH, false, EXCLUSIVE, selector);
consumer.setMessageListener(this);
}
}
@@ -353,8 +357,8 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
if (args.length < 2)
{
System.err.println("Usage: TestPingPublisher <brokerDetails> <virtual path> [verbose (true/false)] " +
- "[transacted (true/false)] [persistent (true/false)] [message size in bytes] [batchsize]" +
- " [rate] [pubsub(true/false)]");
+ "[transacted (true/false)] [persistent (true/false)] [message size in bytes] [batchsize]" +
+ " [rate] [pubsub(true/false)]");
System.exit(0);
}
@@ -366,7 +370,7 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
int messageSize = (args.length >= 6) ? Integer.parseInt(args[5]) : DEFAULT_MESSAGE_SIZE;
int batchSize = (args.length >= 7) ? Integer.parseInt(args[6]) : 1;
int rate = (args.length >= 8) ? Integer.parseInt(args[7]) : 0;
- boolean ispubsub = (args.length >= 9) ? Boolean.parseBoolean(args[8]) : false;
+ boolean ispubsub = (args.length >= 9) ? Boolean.parseBoolean(args[8]) : false;
boolean afterCommit = false;
boolean beforeCommit = false;
@@ -408,9 +412,9 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
// Create a ping producer to handle the request/wait/reply cycle.
PingPongProducer pingProducer = new PingPongProducer(brokerDetails, "guest", "guest", virtualpath,
- PING_DESTINATION_NAME, null, transacted, persistent, messageSize, verbose,
- afterCommit, beforeCommit, afterSend, beforeSend, failOnce, batchSize,
- 0, rate, ispubsub);
+ PING_DESTINATION_NAME, null, transacted, persistent, messageSize, verbose,
+ afterCommit, beforeCommit, afterSend, beforeSend, failOnce, batchSize,
+ 0, rate, ispubsub);
pingProducer.getConnection().start();
@@ -421,7 +425,7 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
// Ensure that the ping pong producer is registered to listen for exceptions on the connection too.
pingProducer.getConnection().setExceptionListener(pingProducer);
-
+
// Create the ping loop thread and run it until it is terminated by the shutdown hook or exception.
Thread pingThread = new Thread(pingProducer);
pingThread.run();
@@ -444,15 +448,19 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
Message first = getTestMessage(_replyDestination, 0, false);
sendMessage(first);
+ commitTx();
+
try
{
Thread.sleep(100);
}
catch (InterruptedException ignore)
{
-
+
}
}
+
+
}
/**
@@ -482,6 +490,12 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
{
_logger.debug("Reply was expected, decrementing the latch for the id.");
trafficLight.countDown();
+
+ if (_messageListener != null)
+ {
+ _messageListener.onMessage(message);
+ }
+
}
else
{
@@ -519,14 +533,52 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
public int pingAndWaitForReply(Message message, int numPings, long timeout) throws JMSException, InterruptedException
{
// Put a unique correlation id on the message before sending it.
- String messageCorrelationId = Long.toString(idGenerator.incrementAndGet());
- message.setJMSCorrelationID(messageCorrelationId);
+ String messageCorrelationId = Long.toString(getNewID());
+
+
+ pingNoWaitForReply(message, numPings, messageCorrelationId);
+
+ CountDownLatch trafficLight = trafficLights.get(messageCorrelationId);
+ // Block the current thread until a reply to the message is received, or it times out.
+ trafficLight.await(timeout, TimeUnit.MILLISECONDS);
+
+ // Work out how many replies were receieved.
+ int numReplies = numPings - (int) trafficLight.getCount();
+
+ if ((numReplies < numPings) && _verbose)
+ {
+ _logger.info("Timed out (" + timeout + " ms) before all replies received on id, " + messageCorrelationId);
+ }
+ else if (_verbose)
+ {
+ _logger.info("Got all replies on id, " + messageCorrelationId);
+ }
+ return numReplies;
+ }
+
+ public long getNewID()
+ {
+ return idGenerator.incrementAndGet();
+ }
+
+ /*
+ * Sends the specified ping message but does not wait for a correlating reply.
+ *
+ * @param message The message to send.
+ * @param numPings The number of pings to send.
+ * @return The reply, or null if no reply arrives before the timeout.
+ * @throws JMSException All underlying JMSExceptions are allowed to fall through.
+ */
+ public void pingNoWaitForReply(Message message, int numPings, String messageCorrelationId) throws JMSException, InterruptedException
+ {
// Create a count down latch to count the number of replies with. This is created before the message is sent
// so that the message is not received before the count down is created.
CountDownLatch trafficLight = new CountDownLatch(numPings);
trafficLights.put(messageCorrelationId, trafficLight);
+ message.setJMSCorrelationID(messageCorrelationId);
+
// Set up a committed flag to detect uncommitted message at the end of the send loop. This may occurr if the
// transaction batch size is not a factor of the number of pings. In which case an extra commit at the end is
// needed.
@@ -579,43 +631,6 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
_logger.info(timestampFormatter.format(new Date()) + ": Pinged at with correlation id, " + messageCorrelationId);
}
- // Block the current thread until a reply to the message is received, or it times out.
- trafficLight.await(timeout, TimeUnit.MILLISECONDS);
-
- // Work out how many replies were receieved.
- int numReplies = numPings - (int) trafficLight.getCount();
-
- if ((numReplies < numPings) && _verbose)
- {
- _logger.info("Timed out (" + timeout + " ms) before all replies received on id, " + messageCorrelationId);
- }
- else if (_verbose)
- {
- _logger.info("Got all replies on id, " + messageCorrelationId);
- }
-
- return numReplies;
- }
-
- /*
- * Sends the specified ping message but does not wait for a correlating reply.
- *
- * @param message The message to send.
- * @param numPings The number of pings to send.
- * @return The reply, or null if no reply arrives before the timeout.
- * @throws JMSException All underlying JMSExceptions are allowed to fall through.
- */
- public void pingNoWaitForReply(Message message, int numPings) throws JMSException, InterruptedException
- {
- for (int i = 0; i < numPings; i++)
- {
- sendMessage(message);
-
- if (_verbose)
- {
- _logger.info(timestampFormatter.format(new Date()) + ": Pinged at.");
- }
- }
}
/**
@@ -658,14 +673,24 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
_replyDestination = destination;
}
+ public void setMessageListener(MessageListener messageListener)
+ {
+ _messageListener = messageListener;
+ }
+
+ public CountDownLatch getEndLock(String correlationID)
+ {
+ return trafficLights.get(correlationID);
+ }
+
/*
- * When the test is being performed with multiple queues, then this method will be used, which has a loop to
- * pick up the next queue from the queues list and sends message to it.
- *
- * @param message
- * @param numPings
- * @throws JMSException
- */
+ * When the test is being performed with multiple queues, then this method will be used, which has a loop to
+ * pick up the next queue from the queues list and sends message to it.
+ *
+ * @param message
+ * @param numPings
+ * @throws JMSException
+ */
/*private void pingMultipleQueues(Message message, int numPings) throws JMSException
{
int queueIndex = 0;
diff --git a/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
index 5a1502ae6b..edd3c909c6 100644
--- a/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
+++ b/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
@@ -37,13 +37,11 @@ import java.util.concurrent.CountDownLatch;
public class PingAsyncTestPerf extends PingTestPerf //implements TimingControllerAware
{
- private static Logger _logger = Logger.getLogger(PingAsyncTestPerf.class);
+// private static Logger _logger = Logger.getLogger(PingAsyncTestPerf.class);
// private TimingController _timingController;
-//
-// private final CountDownLatch _completedLock = new CountDownLatch(1);
-//
-// private AsyncMessageListener _listener;
+
+// private AsyncMessageListener _listener;
private volatile boolean _done = false;
@@ -51,7 +49,7 @@ public class PingAsyncTestPerf extends PingTestPerf //implements TimingControlle
{
super(name);
}
-//
+
// /**
// * Compile all the tests into a test suite.
// */
@@ -79,13 +77,16 @@ public class PingAsyncTestPerf extends PingTestPerf //implements TimingControlle
// String username = "guest";
// String password = "guest";
// String virtualpath = testParameters.getProperty(VIRTUAL_PATH_PROPNAME);
-// int queueCount = Integer.parseInt(testParameters.getProperty(PING_QUEUE_COUNT_PROPNAME));
-// String queueName = testParameters.getProperty(PING_QUEUE_NAME_PROPNAME);
+// int destinationscount = Integer.parseInt(testParameters.getProperty(PING_DESTINATION_COUNT_PROPNAME));
+// String destinationname = testParameters.getProperty(PING_DESTINATION_NAME_PROPNAME);
// boolean persistent = Boolean.parseBoolean(testParameters.getProperty(PERSISTENT_MODE_PROPNAME));
// boolean transacted = Boolean.parseBoolean(testParameters.getProperty(TRANSACTED_PROPNAME));
// String selector = null;
// boolean verbose = Boolean.parseBoolean(testParameters.getProperty(VERBOSE_OUTPUT_PROPNAME));
// int messageSize = Integer.parseInt(testParameters.getProperty(MESSAGE_SIZE_PROPNAME));
+// int rate = Integer.parseInt(testParameters.getProperty(RATE_PROPNAME));
+// boolean pubsub = Boolean.parseBoolean(testParameters.getProperty(IS_PUBSUB_PROPNAME));
+//
//
// boolean afterCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_COMMIT));
// boolean beforeCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_COMMIT));
@@ -96,18 +97,16 @@ public class PingAsyncTestPerf extends PingTestPerf //implements TimingControlle
// int batchSize = Integer.parseInt(testParameters.getProperty(BATCH_SIZE));
// int commitbatchSize = Integer.parseInt(testParameters.getProperty(COMMIT_BATCH_SIZE));
//
-// int rate = Integer.parseInt(testParameters.getProperty(RATE_PROPNAME));
-//
// // This is synchronized because there is a race condition, which causes one connection to sleep if
// // all threads try to create connection concurrently
// synchronized (this)
// {
// // Establish a client to ping a Queue and listen the reply back from same Queue
// perThreadSetup._pingItselfClient = new TestPingItself(brokerDetails, username, password, virtualpath,
-// queueName, selector, transacted, persistent,
+// destinationname, selector, transacted, persistent,
// messageSize, verbose,
// afterCommit, beforeCommit, afterSend, beforeSend, failOnce,
-// commitbatchSize, queueCount, rate);
+// commitbatchSize, destinationscount, rate, pubsub);
//
//
// _listener = new AsyncMessageListener(batchSize);
@@ -154,9 +153,11 @@ public class PingAsyncTestPerf extends PingTestPerf //implements TimingControlle
// // start the test
// long timeout = Long.parseLong(testParameters.getProperty(TIMEOUT_PROPNAME));
//
+// String correlationID = Long.toString(perThreadSetup._pingItselfClient.getNewID());
+//
// try
// {
-// perThreadSetup._pingItselfClient.pingNoWaitForReply(msg, numPings);
+// perThreadSetup._pingItselfClient.pingNoWaitForReply(msg, numPings, correlationID);
// }
// catch (JMSException e)
// {
@@ -174,7 +175,7 @@ public class PingAsyncTestPerf extends PingTestPerf //implements TimingControlle
// {
// _logger.info("awating test finish");
//
-// _completedLock.await();
+// perThreadSetup._pingItselfClient.getEndLock(correlationID).await();
// }
// catch (InterruptedException e)
// {
@@ -193,7 +194,7 @@ public class PingAsyncTestPerf extends PingTestPerf //implements TimingControlle
// Assert.fail("The ping timed out after " + timeout + " ms. Messages Sent = " + numPings + ", MessagesReceived = " + numReplies);
// try
// {
-// _timingController.completeTest(false);
+// _timingController.completeTest(false, numPings - numReplies);
// }
// catch (InterruptedException e)
// {
@@ -202,7 +203,6 @@ public class PingAsyncTestPerf extends PingTestPerf //implements TimingControlle
// }
// }
//
-//
// public void setTimingController(TimingController timingController)
// {
// _timingController = timingController;
@@ -216,7 +216,7 @@ public class PingAsyncTestPerf extends PingTestPerf //implements TimingControlle
//
// private class AsyncMessageListener implements MessageListener
// {
-// private int _messageRecevied;
+// private int _messageReceived;
// private int _totalMessages;
// private int _batchSize;
//
@@ -224,14 +224,14 @@ public class PingAsyncTestPerf extends PingTestPerf //implements TimingControlle
// {
// _batchSize = batchSize;
// _totalMessages = totalMessages;
-// _messageRecevied = 0;
+// _messageReceived = 0;
// }
//
// public AsyncMessageListener(int batchSize)
// {
// _batchSize = batchSize;
// _totalMessages = -1;
-// _messageRecevied = 0;
+// _messageReceived = 0;
// }
//
// public void setTotalMessages(int newTotal)
@@ -242,14 +242,16 @@ public class PingAsyncTestPerf extends PingTestPerf //implements TimingControlle
// public void onMessage(Message message)
// {
// _logger.info("Message Recevied");
+//
+// _messageReceived++;
+//
// try
// {
-// _messageRecevied++;
-// if (_messageRecevied == _batchSize)
+// if (_messageReceived == _batchSize)
// {
// if (_timingController != null)
// {
-// _timingController.completeTest(true);
+// _timingController.completeTest(true, _batchSize);
// }
// }
// }
@@ -258,7 +260,7 @@ public class PingAsyncTestPerf extends PingTestPerf //implements TimingControlle
// doDone();
// }
//
-// if (_totalMessages == -1 || _messageRecevied == _totalMessages)
+// if (_totalMessages == -1 || _messageReceived == _totalMessages)
// {
// _logger.info("Test Completed.. signalling");
// doDone();
@@ -268,10 +270,9 @@ public class PingAsyncTestPerf extends PingTestPerf //implements TimingControlle
// private void doDone()
// {
// _done = true;
-// _completedLock.countDown();
// try
// {
-// _timingController.completeTest(true);
+// _timingController.completeTest(true, _totalMessages - _messageReceived);
// }
// catch (InterruptedException e)
// {
@@ -281,8 +282,9 @@ public class PingAsyncTestPerf extends PingTestPerf //implements TimingControlle
//
// public int getReplyCount()
// {
-// return _messageRecevied;
+// return _messageReceived;
// }
+//
// }
}
diff --git a/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
index 88a791ecb3..5c4a9f189f 100644
--- a/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
+++ b/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
@@ -42,17 +42,17 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll
/**
* Holds the name of the property to get the test message size from.
*/
- private static final String MESSAGE_SIZE_PROPNAME = "messagesize";
+ protected static final String MESSAGE_SIZE_PROPNAME = "messagesize";
/**
* Holds the name of the property to get the ping queue name from.
*/
- private static final String PING_DESTINATION_NAME_PROPNAME = "destinationname";
+ protected static final String PING_DESTINATION_NAME_PROPNAME = "destinationname";
/**
* holds the queue count, if the test is being performed with multiple queues
*/
- private static final String PING_DESTINATION_COUNT_PROPNAME = "destinationscount";
+ protected static final String PING_DESTINATION_COUNT_PROPNAME = "destinationscount";
/**
* Holds the name of the property to get the test delivery mode from.
@@ -85,7 +85,7 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll
protected static final String VERBOSE_OUTPUT_PROPNAME = "verbose";
/** Holds the true or false depending on wether it is P2P test or PubSub */
- private static final String IS_PUBSUB_PROPNAME = "pubsub";
+ protected static final String IS_PUBSUB_PROPNAME = "pubsub";
/**
* Holds the size of message body to attach to the ping messages.
*/
@@ -168,7 +168,7 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll
setSystemPropertyIfNull(BROKER_PROPNAME, BROKER_DEFAULT);
setSystemPropertyIfNull(VIRTUAL_PATH_PROPNAME, VIRTUAL_PATH_DEFAULT);
setSystemPropertyIfNull(TIMEOUT_PROPNAME, Long.toString(TIMEOUT_DEFAULT));
- setSystemPropertyIfNull(PING_DESTINATION_COUNT_PROPNAME, Integer.toString(1));
+ setSystemPropertyIfNull(PING_DESTINATION_COUNT_PROPNAME, Integer.toString(0));
setSystemPropertyIfNull(VERBOSE_OUTPUT_PROPNAME, Boolean.toString(false));
setSystemPropertyIfNull(RATE_PROPNAME, Integer.toString(RATE_DEFAULT));
setSystemPropertyIfNull(IS_PUBSUB_PROPNAME, Boolean.toString(false));