summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBhupendra Bhusman Bhardwaj <bhupendrab@apache.org>2007-01-23 14:41:33 +0000
committerBhupendra Bhusman Bhardwaj <bhupendrab@apache.org>2007-01-23 14:41:33 +0000
commitf33d7d973c3d55a30fb94c91e721a865ec5689ab (patch)
treeabf49eba6c19f205a50ba40c1a576d747ba8e9da
parent5d9f6b3443c51f437f2b9c0f997e2d26b492dcc6 (diff)
downloadqpid-python-f33d7d973c3d55a30fb94c91e721a865ec5689ab.tar.gz
updated the test for testing with multiple threads
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@499036 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java78
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java50
-rw-r--r--qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java28
3 files changed, 54 insertions, 102 deletions
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java
index 6bfc2af541..4cbe9e8585 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java
@@ -36,9 +36,10 @@ public class TestPingItself extends PingPongProducer
private static final Logger _logger = Logger.getLogger(TestPingItself.class);
/**
- * This creates a client for pinging to a Queue. There will be one producer and one consumer instance. Consumer
- * listening to the same Queue, producer is sending to
- *
+ * If queueCount is <= 1 : There will be one Queue and one consumer instance for the test
+ * If queueCount is > 1 : This creats a client for tests with multiple queues. Creates as many consumer instances
+ * as there are queues, each listening to a Queue. A producer is created which picks up a queue from
+ * the list of queues to send message
* @param brokerDetails
* @param username
* @param password
@@ -53,57 +54,31 @@ public class TestPingItself extends PingPongProducer
* @param beforeCommit
* @param afterSend
* @param beforeSend
- * @param batchSize
- * @throws Exception
- */
- public TestPingItself(String brokerDetails, String username, String password, String virtualpath, String queueName,
- String selector, boolean transacted, boolean persistent, int messageSize, boolean verbose,
- boolean afterCommit, boolean beforeCommit, boolean afterSend, boolean beforeSend, boolean failOnce,
- int batchSize)
- throws Exception
- {
- super(brokerDetails, username, password, virtualpath, queueName, selector, transacted, persistent, messageSize,
- verbose, afterCommit, beforeCommit, afterSend, beforeSend, failOnce, batchSize, 0);
- }
-
- /**
- * This creats a client for tests with multiple queues. Creates as many consumer instances as there are queues,
- * each listening to a Queue. A producer is created which picks up a queue from the list of queues to send message.
- *
- * @param brokerDetails
- * @param username
- * @param password
- * @param virtualpath
- * @param selector
- * @param transacted
- * @param persistent
- * @param messageSize
- * @param verbose
- * @param afterCommit
- * @param beforeCommit
- * @param afterSend
- * @param beforeSend
+ * @param failOnce
* @param batchSize
* @param queueCount
* @throws Exception
*/
- public TestPingItself(String brokerDetails, String username, String password, String virtualpath,
+ public TestPingItself(String brokerDetails, String username, String password, String virtualpath, String queueName,
String selector, boolean transacted, boolean persistent, int messageSize, boolean verbose,
boolean afterCommit, boolean beforeCommit, boolean afterSend, boolean beforeSend, boolean failOnce,
int batchSize, int queueCount)
throws Exception
{
- super(brokerDetails, username, password, virtualpath, null, null, transacted, persistent, messageSize,
+ super(brokerDetails, username, password, virtualpath, queueName, selector, transacted, persistent, messageSize,
verbose, afterCommit, beforeCommit, afterSend, beforeSend, failOnce, batchSize, queueCount);
- createQueues(queueCount);
+ if (queueCount > 1)
+ {
+ createQueues(queueCount);
- _persistent = persistent;
- _messageSize = messageSize;
- _verbose = verbose;
+ _persistent = persistent;
+ _messageSize = messageSize;
+ _verbose = verbose;
- createConsumers(selector);
- createProducer();
+ createConsumers(selector);
+ createProducer();
+ }
}
/**
@@ -136,7 +111,7 @@ public class TestPingItself extends PingPongProducer
boolean persistent = config.usePersistentMessages();
int messageSize = config.getPayload() != 0 ? config.getPayload() : DEFAULT_MESSAGE_SIZE;
int messageCount = config.getMessages();
- int queueCount = config.getQueueCount();
+ int queueCount = config.getQueueCount() != 0 ? config.getQueueCount() : 1;
int batchSize = config.getBatchSize() != 0 ? config.getBatchSize() : BATCH_SIZE;
String queue = "ping_" + System.currentTimeMillis();
@@ -182,22 +157,11 @@ public class TestPingItself extends PingPongProducer
}
}
- TestPingItself pingItself = null;
// Create a ping producer to handle the request/wait/reply cycle.
- if (queueCount > 1)
- {
- pingItself = new TestPingItself(brokerDetails, "guest", "guest", virtualpath, null,
- transacted, persistent, messageSize, verbose,
- afterCommit, beforeCommit, afterSend, beforeSend, failOnce,
- batchSize, queueCount);
- }
- else
- {
- pingItself = new TestPingItself(brokerDetails, "guest", "guest", virtualpath, queue, null,
- transacted, persistent, messageSize, verbose,
- afterCommit, beforeCommit, afterSend, beforeSend, failOnce,
- batchSize);
- }
+ TestPingItself pingItself = new TestPingItself(brokerDetails, "guest", "guest", virtualpath, queue, null,
+ transacted, persistent, messageSize, verbose,
+ afterCommit, beforeCommit, afterSend, beforeSend, failOnce,
+ batchSize, queueCount);
pingItself.getConnection().start();
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
index c0a041037d..6ff0af214b 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
@@ -93,10 +93,6 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
*/
protected static final int BATCH_SIZE = 100;
- /**
- * Keeps track of the ping producer instance used in the run loop.
- */
- private static PingPongProducer _pingProducer;
protected static final int PREFETCH = 100;
protected static final boolean NO_LOCAL = true;
protected static final boolean EXCLUSIVE = false;
@@ -109,7 +105,7 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
/**
* A source for providing sequential unique correlation ids.
*/
- private AtomicLong idGenerator = new AtomicLong(0L);
+ private static AtomicLong idGenerator = new AtomicLong(0L);
/**
* Holds the queue to send the ping replies to.
@@ -134,7 +130,7 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
/**
* Holds a map from message ids to latches on which threads wait for replies.
*/
- private Map<String, CountDownLatch> trafficLights = new HashMap<String, CountDownLatch>();
+ private static Map<String, CountDownLatch> trafficLights = new HashMap<String, CountDownLatch>();
/**
* Used to indicate that the ping loop should print out whenever it pings.
@@ -192,21 +188,21 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
this(brokerDetails, username, password, virtualpath, transacted, persistent, messageSize, verbose,
afterCommit, beforeCommit, afterSend, beforeSend, failOnce, batchSize);
- if (queueName != null)
- {
- _pingQueue = new AMQQueue(queueName);
- // Create producer and the consumer
- createProducer();
- createConsumer(selector);
- }
- else if (queueCount > 0)
- {
- _queueCount = queueCount;
- }
- else
+ _queueCount = queueCount;
+ if (queueCount <= 1)
{
- _logger.error("Queue Count is zero and no queueName specified. One must be set.");
- throw new IllegalArgumentException("Queue Count is zero and no queueName specified. One must be set.");
+ if (queueName != null)
+ {
+ _pingQueue = new AMQQueue(queueName);
+ // Create producer and the consumer
+ createProducer();
+ createConsumer(selector);
+ }
+ else
+ {
+ _logger.error("Queue Name is not specified");
+ throw new IllegalArgumentException("Queue Name is not specified");
+ }
}
}
@@ -351,23 +347,23 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
}
// Create a ping producer to handle the request/wait/reply cycle.
- _pingProducer = new PingPongProducer(brokerDetails, "guest", "guest", virtualpath, PING_QUEUE_NAME, null, transacted,
+ PingPongProducer pingProducer = new PingPongProducer(brokerDetails, "guest", "guest", virtualpath, PING_QUEUE_NAME, null, transacted,
persistent, messageSize, verbose,
afterCommit, beforeCommit, afterSend, beforeSend, failOnce,
batchSize, 0);
- _pingProducer.getConnection().start();
+ pingProducer.getConnection().start();
// Run a few priming pings to remove warm up time from test results.
- _pingProducer.prime(PRIMING_LOOPS);
+ pingProducer.prime(PRIMING_LOOPS);
// Create a shutdown hook to terminate the ping-pong producer.
- Runtime.getRuntime().addShutdownHook(_pingProducer.getShutdownHook());
+ Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook());
// Ensure that the ping pong producer is registered to listen for exceptions on the connection too.
- _pingProducer.getConnection().setExceptionListener(_pingProducer);
+ pingProducer.getConnection().setExceptionListener(pingProducer);
// Create the ping loop thread and run it until it is terminated by the shutdown hook or exception.
- Thread pingThread = new Thread(_pingProducer);
+ Thread pingThread = new Thread(pingProducer);
pingThread.run();
pingThread.join();
}
@@ -502,7 +498,7 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
if ((numReplies < numPings) && _verbose)
{
- _logger.info("Timed out before all replies received on id, " + messageCorrelationId);
+ _logger.info("Timed out (" + timeout + " ms) before all replies received on id, " + messageCorrelationId);
}
else if (_verbose)
{
diff --git a/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java b/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
index 446888f8c1..d476080720 100644
--- a/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
+++ b/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
@@ -79,6 +79,8 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll
*/
private static final String TIMEOUT_PROPNAME = "timeout";
+ private static final String VERBOSE_OUTPUT_PROPNAME = "verbose";
+
/**
* Holds the size of message body to attach to the ping messages.
*/
@@ -158,6 +160,7 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll
setSystemPropertyIfNull(VIRTUAL_PATH_PROPNAME, VIRTUAL_PATH_DEFAULT);
setSystemPropertyIfNull(TIMEOUT_PROPNAME, Long.toString(TIMEOUT_DEFAULT));
setSystemPropertyIfNull(PING_QUEUE_COUNT_PROPNAME, Integer.toString(1));
+ setSystemPropertyIfNull(VERBOSE_OUTPUT_PROPNAME, Boolean.toString(false));
}
/**
@@ -192,7 +195,6 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll
{
// Get the per thread test setup to run the test through.
PerThreadSetup perThreadSetup = threadSetup.get();
-
if (numPings == 0)
{
_logger.error("Number of pings requested was zero.");
@@ -210,12 +212,10 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll
long timeout = Long.parseLong(testParameters.getProperty(TIMEOUT_PROPNAME));
int numReplies = perThreadSetup._pingItselfClient.pingAndWaitForReply(msg, numPings, timeout);
- _logger.info("replies = " + numReplies);
-
// Fail the test if the timeout was exceeded.
if (numReplies != numPings)
{
- Assert.fail("The ping timed out. Messages Sent = " + numPings + ", MessagesReceived = " + numReplies);
+ Assert.fail("The ping timed out after "+ timeout + " ms. Messages Sent = " + numPings + ", MessagesReceived = " + numReplies);
}
}
@@ -226,6 +226,7 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll
//NDC.push(getName());
// Create the test setups on a per thread basis, only if they have not already been created.
+
if (threadSetup.get() == null)
{
PerThreadSetup perThreadSetup = new PerThreadSetup();
@@ -240,7 +241,7 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll
boolean persistent = Boolean.parseBoolean(testParameters.getProperty(PERSISTENT_MODE_PROPNAME));
boolean transacted = Boolean.parseBoolean(testParameters.getProperty(TRANSACTED_PROPNAME));
String selector = null;
- boolean verbose = false;
+ boolean verbose = Boolean.parseBoolean(testParameters.getProperty(VERBOSE_OUTPUT_PROPNAME));
int messageSize = Integer.parseInt(testParameters.getProperty(MESSAGE_SIZE_PROPNAME));
boolean afterCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_COMMIT));
@@ -251,26 +252,17 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll
int batchSize = Integer.parseInt(testParameters.getProperty(BATCH_SIZE));
- // Establish a client to ping a Queue and listen the reply back from same Queue
- if (queueCount > 1)
- {
- // test client with multiple queues
- perThreadSetup._pingItselfClient = new TestPingItself(brokerDetails, username, password, virtualpath,
- selector, transacted, persistent,
- messageSize, verbose,
- afterCommit, beforeCommit, afterSend, beforeSend, failOnce,
- batchSize, queueCount);
- }
- else
+ // This is synchronized because there is a race condition, which causes one connection to sleep if
+ // all threads try to create connection concurrently
+ synchronized(this)
{
// Establish a client to ping a Queue and listen the reply back from same Queue
perThreadSetup._pingItselfClient = new TestPingItself(brokerDetails, username, password, virtualpath,
queueName, selector, transacted, persistent,
messageSize, verbose,
afterCommit, beforeCommit, afterSend, beforeSend, failOnce,
- batchSize);
+ batchSize, queueCount);
}
-
// Start the client connection
perThreadSetup._pingItselfClient.getConnection().start();