summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBhupendra Bhusman Bhardwaj <bhupendrab@apache.org>2007-01-24 11:05:35 +0000
committerBhupendra Bhusman Bhardwaj <bhupendrab@apache.org>2007-01-24 11:05:35 +0000
commit1e3cc7ff9424e74e565640af8afd8563546252f4 (patch)
tree02c3f9801e11d4d970600688b3bfb6d3f1c0e13d
parent15687d45e9a9f19f3b01af2b685f5c2b734bdf16 (diff)
downloadqpid-python-1e3cc7ff9424e74e565640af8afd8563546252f4.tar.gz
updated the test classes to be used with Topics as well as Queues
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@499356 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java37
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java12
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java112
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java33
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java65
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java141
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/topic/Config.java10
-rw-r--r--java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java32
-rw-r--r--java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java45
9 files changed, 308 insertions, 179 deletions
diff --git a/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java b/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java
index dc2bf39a9b..c0f236b833 100644
--- a/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java
+++ b/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java
@@ -26,6 +26,9 @@ import javax.jms.ObjectMessage;
import javax.jms.StreamMessage;
import javax.jms.BytesMessage;
import javax.jms.TextMessage;
+import javax.jms.Queue;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
public class TestMessageFactory
{
@@ -61,7 +64,39 @@ public class TestMessageFactory
public static ObjectMessage newObjectMessage(Session session, int size) throws JMSException
{
- return session.createObjectMessage(createMessagePayload(size));
+ if (size == 0)
+ {
+ return session.createObjectMessage();
+ }
+ else
+ {
+ return session.createObjectMessage(createMessagePayload(size));
+ }
+ }
+
+ /**
+ * Creates an ObjectMessage with given size and sets the JMS properties (JMSReplyTo and DeliveryMode)
+ * @param session
+ * @param replyDestination
+ * @param size
+ * @param persistent
+ * @return the new ObjectMessage
+ * @throws JMSException
+ */
+ public static ObjectMessage newObjectMessage(Session session, Destination replyDestination, int size, boolean persistent) throws JMSException
+ {
+ ObjectMessage msg = newObjectMessage(session, size);
+
+ // Set the messages persistent delivery flag.
+ msg.setJMSDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+ // Ensure that the temporary reply queue is set as the reply to destination for the message.
+ if (replyDestination != null)
+ {
+ msg.setJMSReplyTo(replyDestination);
+ }
+
+ return msg;
}
public static String createMessagePayload(int size)
diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java b/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java
index c04a8a7d96..da40f73608 100644
--- a/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java
+++ b/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java
@@ -29,6 +29,8 @@ public abstract class AbstractPingClient
private static final Logger _logger = Logger.getLogger(TestPingClient.class);
private AMQConnection _connection;
+ /** tells if the test is being done for pubsub or p2p */
+ private boolean _isPubSub = false;
protected boolean _failBeforeCommit = false;
protected boolean _failAfterCommit = false;
@@ -43,6 +45,16 @@ public abstract class AbstractPingClient
this._connection = _connection;
}
+ public void setPubSub(boolean pubsub)
+ {
+ _isPubSub = pubsub;
+ }
+
+ public boolean isPubSub()
+ {
+ return _isPubSub;
+ }
+
/**
* Convenience method to commit the transaction on the session associated with this bounce back client.
*
diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java b/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java
index 34938a64bf..1891c9b556 100644
--- a/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java
+++ b/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java
@@ -10,16 +10,14 @@ import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.*;
import javax.jms.Connection;
import javax.jms.Message;
-import javax.jms.MessageProducer;
import org.apache.log4j.Logger;
import org.apache.qpid.client.AMQNoConsumersException;
import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.BasicMessageProducer;
+import org.apache.qpid.client.AMQTopic;
import org.apache.qpid.client.message.TestMessageFactory;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.jms.*;
import org.apache.qpid.jms.Session;
/**
@@ -41,6 +39,8 @@ public abstract class AbstractPingProducer implements Runnable, ExceptionListene
{
private static final Logger _logger = Logger.getLogger(AbstractPingProducer.class);
+ /** tells if the test is being done for pubsub or p2p */
+ private boolean _isPubSub = false;
/**
* Used to format time stamping output.
*/
@@ -65,11 +65,12 @@ public abstract class AbstractPingProducer implements Runnable, ExceptionListene
private Session _producerSession;
/**
- * Holds the number of queues the tests will be using to send messages. By default it will be 1
+ * Holds the number of destinations for multiple-destination test. By default it will be 1
*/
- protected int _queueCount = 1;
+ protected int _destinationCount = 1;
- private List<Queue> _queues = new ArrayList<Queue>();
+ /** list of all the destinations for multiple-destinations test */
+ private List<Destination> _destinations = new ArrayList<Destination>();
/**
* Holds the message producer to send the pings through.
@@ -86,6 +87,19 @@ public abstract class AbstractPingProducer implements Runnable, ExceptionListene
protected int _txBatchSize = 1;
/**
+ * Sets the test for pubsub or p2p.
+ * @param value
+ */
+ public void setPubSub(boolean value)
+ {
+ _isPubSub = value;
+ }
+
+ public boolean isPubSub()
+ {
+ return _isPubSub;
+ }
+ /**
* Convenience method for a short pause.
*
* @param sleepTime The time in milliseconds to pause for.
@@ -119,31 +133,11 @@ public abstract class AbstractPingProducer implements Runnable, ExceptionListene
*
* @throws JMSException All underlying JMSException are allowed to fall through.
*/
- public ObjectMessage getTestMessage(Queue replyQueue, int messageSize, boolean persistent) throws JMSException
+ public ObjectMessage getTestMessage(Destination replyQueue, int messageSize, boolean persistent) throws JMSException
{
- ObjectMessage msg;
-
- if (messageSize != 0)
- {
- msg = TestMessageFactory.newObjectMessage(_producerSession, messageSize);
- }
- else
- {
- msg = _producerSession.createObjectMessage();
- }
-
- // Set the messages persistent delivery flag.
- msg.setJMSDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
+ ObjectMessage msg = TestMessageFactory.newObjectMessage(_producerSession, replyQueue, messageSize, persistent);
// Timestamp the message.
msg.setLongProperty("timestamp", System.currentTimeMillis());
-
- // Ensure that the temporary reply queue is set as the reply to destination for the message.
- if (replyQueue != null)
- {
- msg.setJMSReplyTo(replyQueue);
- }
-
return msg;
}
@@ -217,14 +211,14 @@ public abstract class AbstractPingProducer implements Runnable, ExceptionListene
this._producerSession = session;
}
- public int getQueueCount()
+ public int getDestinationsCount()
{
- return _queueCount;
+ return _destinationCount;
}
- public void setQueueCount(int queueCount)
+ public void setDestinationsCount(int count)
{
- this._queueCount = queueCount;
+ this._destinationCount = count;
}
protected void commitTx() throws JMSException
@@ -233,31 +227,57 @@ public abstract class AbstractPingProducer implements Runnable, ExceptionListene
}
/**
- * Creates queues dynamically and adds to the queues list. This is when the test is being done with
- * multiple queues.
- *
- * @param queueCount
+ * Creates destinations dynamically and adds to the destinations list for multiple-destinations test
+ * @param count
*/
- protected void createQueues(int queueCount)
+ protected void createDestinations(int count)
+ {
+ if (isPubSub())
+ {
+ createTopics(count);
+ }
+ else
+ {
+ createQueues(count);
+ }
+ }
+
+ private void createQueues(int count)
{
- for (int i = 0; i < queueCount; i++)
+ for (int i = 0; i < count; i++)
{
AMQShortString name =
- new AMQShortString("Queue_" + _queueSequenceID.incrementAndGet() + "_" + System.currentTimeMillis());
+ new AMQShortString("AMQQueue_" + _queueSequenceID.incrementAndGet() + "_" + System.currentTimeMillis());
AMQQueue queue = new AMQQueue(name, name, false, false, false);
- _queues.add(queue);
+ _destinations.add(queue);
}
}
- protected Queue getQueue(int index)
+ private void createTopics(int count)
{
- return _queues.get(index);
+ for (int i = 0; i < count; i++)
+ {
+ AMQShortString name =
+ new AMQShortString("AMQTopic_" + _queueSequenceID.incrementAndGet() + "_" + System.currentTimeMillis());
+ AMQTopic topic = new AMQTopic(name);
+
+ _destinations.add(topic);
+ }
+ }
+
+ /**
+ * Returns the destination from the destinations list with given index. This is for multiple-destinations test
+ * @param index
+ * @return Destination with given index
+ */
+ protected Destination getDestination(int index)
+ {
+ return _destinations.get(index);
}
/**
* Convenience method to commit the transaction on the session associated with this pinger.
- *
* @throws javax.jms.JMSException If the commit fails and then the rollback fails.
*/
protected void commitTx(Session session) throws JMSException
@@ -336,7 +356,7 @@ public abstract class AbstractPingProducer implements Runnable, ExceptionListene
sendMessage(null, message);
}
- protected void sendMessage(Queue q, Message message) throws JMSException
+ protected void sendMessage(Destination destination, Message message) throws JMSException
{
if (_failBeforeSend)
{
@@ -349,13 +369,13 @@ public abstract class AbstractPingProducer implements Runnable, ExceptionListene
doFailover();
}
- if (q == null)
+ if (destination == null)
{
_producer.send(message);
}
else
{
- _producer.send(q, message);
+ _producer.send(destination, message);
}
commitTx();
diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java b/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java
index 9a2e2c7e8c..786aaa1e08 100644
--- a/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java
+++ b/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java
@@ -38,8 +38,8 @@ public class TestPingItself extends PingPongProducer
private static final Logger _logger = Logger.getLogger(TestPingItself.class);
/**
- * If queueCount is <= 1 : There will be one Queue and one consumer instance for the test
- * If queueCount is > 1 : This creats a client for tests with multiple queues. Creates as many consumer instances
+ * If noOfDestinations is <= 1 : There will be one Queue and one consumer instance for the test
+ * If noOfDestinations is > 1 : This creats a client for tests with multiple queues. Creates as many consumer instances
* as there are queues, each listening to a Queue. A producer is created which picks up a queue from
* the list of queues to send message
*
@@ -59,20 +59,21 @@ public class TestPingItself extends PingPongProducer
* @param beforeSend
* @param failOnce
* @param batchSize
- * @param queueCount
+ * @param noOfDestinations
* @throws Exception
*/
public TestPingItself(String brokerDetails, String username, String password, String virtualpath, String queueName,
String selector, boolean transacted, boolean persistent, int messageSize, boolean verbose,
boolean afterCommit, boolean beforeCommit, boolean afterSend, boolean beforeSend, boolean failOnce,
- int batchSize, int queueCount, int rate) throws Exception
+ int batchSize, int noOfDestinations, int rate, boolean pubsub) throws Exception
{
- super(brokerDetails, username, password, virtualpath, queueName, selector, transacted, persistent, messageSize,
- verbose, afterCommit, beforeCommit, afterSend, beforeSend, failOnce, batchSize, queueCount, rate);
+ super(brokerDetails, username, password, virtualpath, queueName, selector, transacted, persistent,
+ messageSize, verbose, afterCommit, beforeCommit, afterSend, beforeSend, failOnce, batchSize,
+ noOfDestinations, rate, pubsub);
- if (queueCount > 1)
+ if (noOfDestinations > 1)
{
- createQueues(queueCount);
+ createDestinations(noOfDestinations);
_persistent = persistent;
_messageSize = messageSize;
@@ -83,17 +84,16 @@ public class TestPingItself extends PingPongProducer
}
}
-
- /**
+ /**
* Sets the replyQueue to be the same as ping queue.
*/
@Override
public void createConsumer(String selector) throws JMSException
{
// Create a message consumer to get the replies with and register this to be called back by it.
- setReplyQueue(getPingQueue());
+ setReplyDestination(getPingDestination());
MessageConsumer consumer =
- getConsumerSession().createConsumer(getReplyQueue(), PREFETCH, false, EXCLUSIVE, selector);
+ getConsumerSession().createConsumer(getReplyDestination(), PREFETCH, false, EXCLUSIVE, selector);
consumer.setMessageListener(this);
}
@@ -123,6 +123,7 @@ public class TestPingItself extends PingPongProducer
int queueCount = (config.getQueueCount() != 0) ? config.getQueueCount() : 1;
int batchSize = (config.getBatchSize() != 0) ? config.getBatchSize() : BATCH_SIZE;
int rate = (config.getRate() != 0) ? config.getRate() : 0;
+ boolean pubsub = config.isPubSub();
String queue = "ping_" + System.currentTimeMillis();
_logger.info("Queue:" + queue + ", Transacted:" + transacted + ", persistent:" + persistent + ",MessageSize:"
@@ -169,9 +170,9 @@ public class TestPingItself extends PingPongProducer
// Create a ping producer to handle the request/wait/reply cycle.
TestPingItself pingItself = new TestPingItself(brokerDetails, "guest", "guest", virtualpath, queue, null,
- transacted, persistent, messageSize, verbose,
- afterCommit, beforeCommit, afterSend, beforeSend, failOnce,
- batchSize, queueCount, rate);
+ transacted, persistent, messageSize, verbose, afterCommit,
+ beforeCommit, afterSend, beforeSend, failOnce, batchSize,
+ queueCount, rate, pubsub);
pingItself.getConnection().start();
@@ -214,6 +215,4 @@ public class TestPingItself extends PingPongProducer
+ "-messages : no of messages to be sent (if 0, the ping loop will run indefinitely)");
System.exit(0);
}
-
-
}
diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java
index a7ad4b91b8..3b4572d1b3 100644
--- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java
+++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java
@@ -29,6 +29,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQTopic;
import org.apache.qpid.jms.ConnectionListener;
import org.apache.qpid.jms.Session;
import org.apache.qpid.ping.AbstractPingClient;
@@ -75,6 +76,8 @@ public class PingPongBouncer extends AbstractPingClient implements MessageListen
/** Determines whether this bounce back client bounces back messages persistently. */
private boolean _persistent = false;
+ private Destination _consumerDestination;
+
/** Keeps track of the response destination of the previous message for the last reply to producer cache. */
private Destination _lastResponseDest;
@@ -91,24 +94,28 @@ public class PingPongBouncer extends AbstractPingClient implements MessageListen
* Creates a PingPongBouncer on the specified producer and consumer sessions.
*
* @param brokerDetails The addresses of the brokers to connect to.
- * @param username The broker username.
- * @param password The broker password.
- * @param virtualpath The virtual host name within the broker.
- * @param queueName The name of the queue to receive pings on (or root of the queue name where many queues are generated).
- * @param persistent A flag to indicate that persistent message should be used.
- * @param transacted A flag to indicate that pings should be sent within transactions.
- * @param selector A message selector to filter received pings with.
- * @param verbose A flag to indicate that message timings should be sent to the console.
+ * @param username The broker username.
+ * @param password The broker password.
+ * @param virtualpath The virtual host name within the broker.
+ * @param destinationName The name of the queue to receive pings on
+ * (or root of the queue name where many queues are generated).
+ * @param persistent A flag to indicate that persistent message should be used.
+ * @param transacted A flag to indicate that pings should be sent within transactions.
+ * @param selector A message selector to filter received pings with.
+ * @param verbose A flag to indicate that message timings should be sent to the console.
*
* @throws Exception All underlying exceptions allowed to fall through. This is only test code...
*/
- public PingPongBouncer(String brokerDetails, String username, String password, String virtualpath, String queueName,
- boolean persistent, boolean transacted, String selector, boolean verbose) throws Exception
+ public PingPongBouncer(String brokerDetails, String username, String password, String virtualpath,
+ String destinationName, boolean persistent, boolean transacted, String selector,
+ boolean verbose, boolean pubsub) throws Exception
{
// Create a client id to uniquely identify this client.
InetAddress address = InetAddress.getLocalHost();
String clientId = address.getHostName() + System.currentTimeMillis();
-
+ _verbose = verbose;
+ _persistent = persistent;
+ setPubSub(pubsub);
// Connect to the broker.
setConnection(new AMQConnection(brokerDetails, username, password, clientId, virtualpath));
_logger.info("Connected with URL:" + getConnection().toURL());
@@ -122,21 +129,30 @@ public class PingPongBouncer extends AbstractPingClient implements MessageListen
_producerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE);
// Create the queue to listen for message on.
- Queue q = new AMQQueue(queueName);
- MessageConsumer consumer = _consumerSession.createConsumer(q, PREFETCH, NO_LOCAL, EXCLUSIVE, selector);
+ createConsumerDestination(destinationName);
+ MessageConsumer consumer = _consumerSession.createConsumer(_consumerDestination, PREFETCH, NO_LOCAL, EXCLUSIVE, selector);
// Create a producer for the replies, without a default destination.
_replyProducer = _producerSession.createProducer(null);
_replyProducer.setDisableMessageTimestamp(true);
_replyProducer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
- _verbose = verbose;
- _persistent = persistent;
-
// Set this up to listen for messages on the queue.
consumer.setMessageListener(this);
}
+ private void createConsumerDestination(String name)
+ {
+ if (isPubSub())
+ {
+ _consumerDestination = new AMQTopic(name);
+ }
+ else
+ {
+ _consumerDestination = new AMQQueue(name);
+ }
+ }
+
/**
* Starts a stand alone ping-pong client running in verbose mode.
*
@@ -149,8 +165,9 @@ public class PingPongBouncer extends AbstractPingClient implements MessageListen
// Display help on the command line.
if (args.length < 5)
{
- System.err.println("Usage: <brokerdetails> <username> <password> <virtual-path> <serviceQueue> "
- + "[<P[ersistent]|N[onPersistent]> <T[ransacted]|N<onTransacted]>] [selector]");
+ System.err.println("Usage: <brokerdetails> <username> <password> <virtual-path> <serviceQueue> " +
+ "[<P[ersistent]|N[onPersistent]> <T[ransacted]|N<onTransacted]>] " +
+ "[selector] [pubsub(true/false)]");
System.exit(1);
}
@@ -160,14 +177,15 @@ public class PingPongBouncer extends AbstractPingClient implements MessageListen
String password = args[2];
String virtualpath = args[3];
String queueName = args[4];
- boolean persistent = ((args.length >= 6) && (args[5].toUpperCase().charAt(0) == 'P'));
- boolean transacted = ((args.length >= 7) && (args[6].toUpperCase().charAt(0) == 'T'));
- String selector = (args.length == 8) ? args[5] : null;
+ boolean persistent = ((args.length > 5) && (args[5].toUpperCase().charAt(0) == 'P'));
+ boolean transacted = ((args.length > 6) && (args[6].toUpperCase().charAt(0) == 'T'));
+ String selector = (args.length > 7) ? args[7] : null;
+ boolean pubsub = (args.length > 8) ? Boolean.parseBoolean(args[8]) : false;
// Instantiate the ping pong client with the command line options and start it running.
PingPongBouncer pingBouncer =
- new PingPongBouncer(brokerDetails, username, password, virtualpath, queueName, persistent, transacted, selector,
- true);
+ new PingPongBouncer(brokerDetails, username, password, virtualpath, queueName, persistent, transacted,
+ selector, true, pubsub);
pingBouncer.getConnection().start();
System.out.println("Waiting...");
@@ -185,7 +203,6 @@ public class PingPongBouncer extends AbstractPingClient implements MessageListen
try
{
String messageCorrelationId = message.getJMSCorrelationID();
-
if (_verbose)
{
_logger.info(timestampFormatter.format(new Date()) + ": Got ping with correlation id, "
diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
index e68c988d08..2c27b48c88 100644
--- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
+++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
@@ -34,6 +34,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQTopic;
import org.apache.qpid.jms.ConnectionListener;
import org.apache.qpid.jms.MessageProducer;
import org.apache.qpid.jms.Session;
@@ -85,9 +86,9 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
protected static final long TIMEOUT = 9000;
/**
- * Holds the name of the queue to send pings on.
+ * Holds the name of the destination to send pings on.
*/
- protected static final String PING_QUEUE_NAME = "ping";
+ protected static final String PING_DESTINATION_NAME = "ping";
/**
* The batch size.
@@ -114,14 +115,14 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
private static Map<String, CountDownLatch> trafficLights = new HashMap<String, CountDownLatch>();
/**
- * Holds the queue to send the ping replies to.
+ * Destination where the responses messages will arrive
*/
- private Queue _replyQueue;
+ private Destination _replyDestination;
/**
- * Hold the known Queue where the producer will be sending message to
+ * Destination where the producer will be sending message to
*/
- private Queue _pingQueue;
+ private Destination _pingDestination;
/**
* Determines whether this producer sends persistent messages from the run method.
@@ -213,20 +214,23 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
* @param transacted
* @throws Exception All allowed to fall through. This is only test code...
*/
- public PingPongProducer(String brokerDetails, String username, String password, String virtualpath, String queueName,
- String selector, boolean transacted, boolean persistent, int messageSize, boolean verbose,
- boolean afterCommit, boolean beforeCommit, boolean afterSend, boolean beforeSend,
- boolean failOnce, int batchSize, int queueCount, int rate) throws Exception
+ public PingPongProducer(String brokerDetails, String username, String password, String virtualpath,
+ String destinationName, String selector, boolean transacted, boolean persistent,
+ int messageSize, boolean verbose, boolean afterCommit, boolean beforeCommit,
+ boolean afterSend, boolean beforeSend, boolean failOnce, int batchSize,
+ int noOfDestinations, int rate, boolean pubsub) throws Exception
{
this(brokerDetails, username, password, virtualpath, transacted, persistent, messageSize, verbose, afterCommit,
beforeCommit, afterSend, beforeSend, failOnce, batchSize, rate);
- _queueCount = queueCount;
- if (queueCount <= 1)
+ _destinationCount = noOfDestinations;
+ setPubSub(pubsub);
+
+ if (noOfDestinations <= 1)
{
- if (queueName != null)
+ if (destinationName != null)
{
- _pingQueue = new AMQQueue(queueName);
+ createPingDestination(destinationName);
// Create producer and the consumer
createProducer();
createConsumer(selector);
@@ -239,57 +243,76 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
}
}
+ private void createPingDestination(String name)
+ {
+ if (isPubSub())
+ {
+ _pingDestination = new AMQTopic(name);
+ }
+ else
+ {
+ _pingDestination = new AMQQueue(name);
+ }
+ }
+
/**
- * Creates the producer to send the pings on. If the tests are with nultiple queues, then producer
+ * Creates the producer to send the pings on. If the tests are with nultiple-destinations, then producer
* is created with null destination, so that any destination can be specified while sending
- *
* @throws JMSException
*/
public void createProducer() throws JMSException
{
- if (getQueueCount() > 1)
+ if (getDestinationsCount() > 1)
{
- // create producer with initial destination as null for test with multiple queues
+ // create producer with initial destination as null for test with multiple-destinations
// In this case, a different destination will be used while sending the message
_producer = (MessageProducer) getProducerSession().createProducer(null);
}
else
{
- // Create a queue and producer to send the pings on.
- _producer = (MessageProducer) getProducerSession().createProducer(_pingQueue);
+ // Create a producer with known destination to send the pings on.
+ _producer = (MessageProducer) getProducerSession().createProducer(_pingDestination);
}
+
_producer.setDisableMessageTimestamp(true);
_producer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
}
/**
- * Creates the temporary queue to listen to the responses
- *
+ * Creates the temporary destination to listen to the responses
* @param selector
* @throws JMSException
*/
public void createConsumer(String selector) throws JMSException
{
- // Create a temporary queue to get the pongs on.
- _replyQueue = _consumerSession.createTemporaryQueue();
+ // Create a temporary destination to get the pongs on.
+ if (isPubSub())
+ {
+ _replyDestination = _consumerSession.createTemporaryTopic();
+ }
+ else
+ {
+ _replyDestination = _consumerSession.createTemporaryQueue();
+ }
// Create a message consumer to get the replies with and register this to be called back by it.
- MessageConsumer consumer = _consumerSession.createConsumer(_replyQueue, PREFETCH, NO_LOCAL, EXCLUSIVE, selector);
+ MessageConsumer consumer = _consumerSession.createConsumer(_replyDestination, PREFETCH, NO_LOCAL, EXCLUSIVE, selector);
consumer.setMessageListener(this);
}
/**
- * Creates consumer instances for each queue. This is used when test is being done with multiple queues.
- *
+ * Creates consumer instances for each destination. This is used when test is being done with multiple destinations.
+ *
* @param selector
* @throws JMSException
*/
public void createConsumers(String selector) throws JMSException
{
- for (int i = 0; i < getQueueCount(); i++)
+ for (int i = 0; i < getDestinationsCount(); i++)
{
- MessageConsumer consumer = getConsumerSession().createConsumer(getQueue(i), PREFETCH, false, EXCLUSIVE, selector);
+ MessageConsumer consumer =
+ getConsumerSession().createConsumer(getDestination(i), PREFETCH, false, EXCLUSIVE, selector);
consumer.setMessageListener(this);
}
}
@@ -300,14 +323,14 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
return _consumerSession;
}
- public Queue getPingQueue()
+ public Destination getPingDestination()
{
- return _pingQueue;
+ return _pingDestination;
}
- protected void setPingQueue(Queue queue)
+ protected void setPingDestination(Destination destination)
{
- _pingQueue = queue;
+ _pingDestination = destination;
}
/**
@@ -329,9 +352,9 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
// Extract the command line.
if (args.length < 2)
{
- System.err.println(
- "Usage: TestPingPublisher <brokerDetails> <virtual path> [verbose (true/false)] "
- + "[transacted (true/false)] [persistent (true/false)] [message size in bytes] [batchsize] [rate]");
+ System.err.println("Usage: TestPingPublisher <brokerDetails> <virtual path> [verbose (true/false)] " +
+ "[transacted (true/false)] [persistent (true/false)] [message size in bytes] [batchsize]" +
+ " [rate] [pubsub(true/false)]");
System.exit(0);
}
@@ -343,6 +366,7 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
int messageSize = (args.length >= 6) ? Integer.parseInt(args[5]) : DEFAULT_MESSAGE_SIZE;
int batchSize = (args.length >= 7) ? Integer.parseInt(args[6]) : 1;
int rate = (args.length >= 8) ? Integer.parseInt(args[7]) : 0;
+ boolean ispubsub = (args.length >= 9) ? Boolean.parseBoolean(args[8]) : false;
boolean afterCommit = false;
boolean beforeCommit = false;
@@ -383,21 +407,21 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
}
// Create a ping producer to handle the request/wait/reply cycle.
- PingPongProducer pingProducer = new PingPongProducer(brokerDetails, "guest", "guest", virtualpath, PING_QUEUE_NAME, null, transacted,
- persistent, messageSize, verbose,
- afterCommit, beforeCommit, afterSend, beforeSend, failOnce,
- batchSize, 0, rate);
+ PingPongProducer pingProducer = new PingPongProducer(brokerDetails, "guest", "guest", virtualpath,
+ PING_DESTINATION_NAME, null, transacted, persistent, messageSize, verbose,
+ afterCommit, beforeCommit, afterSend, beforeSend, failOnce, batchSize,
+ 0, rate, ispubsub);
pingProducer.getConnection().start();
// Run a few priming pings to remove warm up time from test results.
- pingProducer.prime(PRIMING_LOOPS);
+ //pingProducer.prime(PRIMING_LOOPS);
// Create a shutdown hook to terminate the ping-pong producer.
Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook());
// Ensure that the ping pong producer is registered to listen for exceptions on the connection too.
pingProducer.getConnection().setExceptionListener(pingProducer);
-
+
// Create the ping loop thread and run it until it is terminated by the shutdown hook or exception.
Thread pingThread = new Thread(pingProducer);
pingThread.run();
@@ -405,13 +429,6 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
}
/**
- * Creates consumer instances for each queue. This is used when test is being done with multiple queues.
- *
- * @param selector
- * @throws JMSException
- */
-
- /**
* Primes the test loop by sending a few messages, then introduces a short wait. This allows the bounce back client
* on the other end a chance to configure its reply producer on the reply to destination. It is also worth calling
* this a few times, in order to prime the JVMs JIT compilation.
@@ -424,8 +441,7 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
for (int i = 0; i < x; i++)
{
// Create and send a small message.
- Message first = getTestMessage(_replyQueue, 0, false);
-
+ Message first = getTestMessage(_replyDestination, 0, false);
sendMessage(first);
try
@@ -434,6 +450,7 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
}
catch (InterruptedException ignore)
{
+
}
}
}
@@ -524,10 +541,11 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
// Re-timestamp the message.
message.setLongProperty("timestamp", System.currentTimeMillis());
- // Check if the test is with multiple queues, in which case round robin the queues as the messages are sent.
- if (getQueueCount() > 1)
+ // Check if the test is with multiple-destinations, in which case round robin the destinations
+ // as the messages are sent.
+ if (getDestinationsCount() > 1)
{
- sendMessage(getQueue(i % getQueueCount()), message);
+ sendMessage(getDestination(i % getDestinationsCount()), message);
}
else
{
@@ -609,7 +627,7 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
try
{
// Generate a sample message and time stamp it.
- ObjectMessage msg = getTestMessage(_replyQueue, _messageSize, _persistent);
+ ObjectMessage msg = getTestMessage(_replyDestination, _messageSize, _persistent);
msg.setLongProperty("timestamp", System.currentTimeMillis());
// Send the message and wait for a reply.
@@ -630,15 +648,14 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
}
}
- public Queue getReplyQueue()
+ public Destination getReplyDestination()
{
- return _replyQueue;
+ return _replyDestination;
}
-
- protected void setReplyQueue(Queue queue)
+ protected void setReplyDestination(Destination destination)
{
- _replyQueue = queue;
+ _replyDestination = destination;
}
/*
@@ -657,10 +674,10 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
// Re-timestamp the message.
message.setLongProperty("timestamp", System.currentTimeMillis());
- sendMessage(getQueue(queueIndex++), message);
+ sendMessage(getDestination(queueIndex++), message);
// reset the counter to get the first queue
- if (queueIndex == (getQueueCount() - 1))
+ if (queueIndex == (getDestinationsCount() - 1))
{
queueIndex = 0;
}
diff --git a/java/perftests/src/main/java/org/apache/qpid/topic/Config.java b/java/perftests/src/main/java/org/apache/qpid/topic/Config.java
index 6af7929edc..6ecb15155c 100644
--- a/java/perftests/src/main/java/org/apache/qpid/topic/Config.java
+++ b/java/perftests/src/main/java/org/apache/qpid/topic/Config.java
@@ -48,6 +48,7 @@ public class Config extends AbstractConfig implements ConnectorConfig
private int noOfQueues;
private int batchSize;
private int rate;
+ private boolean ispubsub;
public Config()
{
@@ -178,6 +179,11 @@ public class Config extends AbstractConfig implements ConnectorConfig
return transacted;
}
+ public boolean isPubSub()
+ {
+ return ispubsub;
+ }
+
public void setOption(String key, String value)
{
if("-host".equalsIgnoreCase(key))
@@ -255,6 +261,10 @@ public class Config extends AbstractConfig implements ConnectorConfig
{
rate = parseInt("MEssage rate", value);
}
+ else if("-pubsub".equalsIgnoreCase(key))
+ {
+ ispubsub = "true".equalsIgnoreCase(value);
+ }
else
{
System.out.println("Ignoring unrecognised option: " + key);
diff --git a/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
index 402d72d6db..88a791ecb3 100644
--- a/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
+++ b/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
@@ -42,17 +42,17 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll
/**
* Holds the name of the property to get the test message size from.
*/
- protected static final String MESSAGE_SIZE_PROPNAME = "messageSize";
+ private static final String MESSAGE_SIZE_PROPNAME = "messagesize";
/**
* Holds the name of the property to get the ping queue name from.
*/
- protected static final String PING_QUEUE_NAME_PROPNAME = "pingQueue";
+ private static final String PING_DESTINATION_NAME_PROPNAME = "destinationname";
/**
* holds the queue count, if the test is being performed with multiple queues
*/
- protected static final String PING_QUEUE_COUNT_PROPNAME = "queues";
+ private static final String PING_DESTINATION_COUNT_PROPNAME = "destinationscount";
/**
* Holds the name of the property to get the test delivery mode from.
@@ -84,6 +84,8 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll
protected static final String VERBOSE_OUTPUT_PROPNAME = "verbose";
+ /** Holds the true or false depending on wether it is P2P test or PubSub */
+ private static final String IS_PUBSUB_PROPNAME = "pubsub";
/**
* Holds the size of message body to attach to the ping messages.
*/
@@ -95,7 +97,7 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll
/**
* Holds the name of the queue to which pings are sent.
*/
- protected static final String PING_QUEUE_NAME_DEFAULT = "ping";
+ private static final String PING_DESTINATION_NAME_DEFAULT = "ping";
/**
* Holds the message delivery mode to use for the test.
@@ -138,6 +140,8 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll
*/
ThreadLocal<PerThreadSetup> threadSetup = new ThreadLocal<PerThreadSetup>();
+ Object _lock = new Object();
+
// Set up a property reader to extract the test parameters from. Once ContextualProperties is available in
// the project dependencies, use it to get property overrides for configurable tests and to notify the test runner
// of the test parameters to log with the results.
@@ -158,15 +162,16 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll
setSystemPropertyIfNull(BATCH_SIZE, Integer.toString(BATCH_SIZE_DEFAULT));
setSystemPropertyIfNull(COMMIT_BATCH_SIZE, Integer.toString(COMMIT_BATCH_SIZE_DEFAULT));
setSystemPropertyIfNull(MESSAGE_SIZE_PROPNAME, Integer.toString(MESSAGE_SIZE_DEFAULT));
- setSystemPropertyIfNull(PING_QUEUE_NAME_PROPNAME, PING_QUEUE_NAME_DEFAULT);
+ setSystemPropertyIfNull(PING_DESTINATION_NAME_PROPNAME, PING_DESTINATION_NAME_DEFAULT);
setSystemPropertyIfNull(PERSISTENT_MODE_PROPNAME, Boolean.toString(PERSISTENT_MODE_DEFAULT));
setSystemPropertyIfNull(TRANSACTED_PROPNAME, Boolean.toString(TRANSACTED_DEFAULT));
setSystemPropertyIfNull(BROKER_PROPNAME, BROKER_DEFAULT);
setSystemPropertyIfNull(VIRTUAL_PATH_PROPNAME, VIRTUAL_PATH_DEFAULT);
setSystemPropertyIfNull(TIMEOUT_PROPNAME, Long.toString(TIMEOUT_DEFAULT));
- setSystemPropertyIfNull(PING_QUEUE_COUNT_PROPNAME, Integer.toString(1));
+ setSystemPropertyIfNull(PING_DESTINATION_COUNT_PROPNAME, Integer.toString(1));
setSystemPropertyIfNull(VERBOSE_OUTPUT_PROPNAME, Boolean.toString(false));
setSystemPropertyIfNull(RATE_PROPNAME, Integer.toString(RATE_DEFAULT));
+ setSystemPropertyIfNull(IS_PUBSUB_PROPNAME, Boolean.toString(false));
}
/**
@@ -244,14 +249,15 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll
String username = "guest";
String password = "guest";
String virtualpath = testParameters.getProperty(VIRTUAL_PATH_PROPNAME);
- int queueCount = Integer.parseInt(testParameters.getProperty(PING_QUEUE_COUNT_PROPNAME));
- String queueName = testParameters.getProperty(PING_QUEUE_NAME_PROPNAME);
+ int destinationscount = Integer.parseInt(testParameters.getProperty(PING_DESTINATION_COUNT_PROPNAME));
+ String destinationname = testParameters.getProperty(PING_DESTINATION_NAME_PROPNAME);
boolean persistent = Boolean.parseBoolean(testParameters.getProperty(PERSISTENT_MODE_PROPNAME));
boolean transacted = Boolean.parseBoolean(testParameters.getProperty(TRANSACTED_PROPNAME));
String selector = null;
boolean verbose = Boolean.parseBoolean(testParameters.getProperty(VERBOSE_OUTPUT_PROPNAME));
int messageSize = Integer.parseInt(testParameters.getProperty(MESSAGE_SIZE_PROPNAME));
int rate = Integer.parseInt(testParameters.getProperty(RATE_PROPNAME));
+ boolean pubsub = Boolean.parseBoolean(testParameters.getProperty(IS_PUBSUB_PROPNAME));
boolean afterCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_COMMIT));
boolean beforeCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_COMMIT));
@@ -263,14 +269,14 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll
// This is synchronized because there is a race condition, which causes one connection to sleep if
// all threads try to create connection concurrently
- synchronized (this)
+ synchronized (_lock)
{
- // Establish a client to ping a Queue and listen the reply back from same Queue
+ // Establish a client to ping a Destination and listen the reply back from same Destination
perThreadSetup._pingItselfClient = new TestPingItself(brokerDetails, username, password, virtualpath,
- queueName, selector, transacted, persistent,
+ destinationname, selector, transacted, persistent,
messageSize, verbose, afterCommit, beforeCommit,
- afterSend, beforeSend, failOnce, batchSize, queueCount,
- rate);
+ afterSend, beforeSend, failOnce, batchSize, destinationscount,
+ rate, pubsub);
}
// Start the client connection
perThreadSetup._pingItselfClient.getConnection().start();
diff --git a/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java
index 1252871d2c..fca133c425 100644
--- a/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java
+++ b/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java
@@ -1,6 +1,5 @@
package org.apache.qpid.requestreply;
-import java.net.InetAddress;
import java.util.Properties;
import javax.jms.*;
@@ -10,7 +9,6 @@ import junit.framework.Test;
import junit.framework.TestSuite;
import org.apache.log4j.Logger;
-import org.apache.log4j.NDC;
import uk.co.thebadgerset.junit.extensions.AsymptoticTestCase;
@@ -47,12 +45,12 @@ public class PingPongTestPerf extends AsymptoticTestCase //implements TimingCont
/**
* Holds the name of the property to get the test message size from.
*/
- private static final String MESSAGE_SIZE_PROPNAME = "messageSize";
+ private static final String MESSAGE_SIZE_PROPNAME = "messagesize";
/**
* Holds the name of the property to get the ping queue name from.
*/
- private static final String PING_QUEUE_NAME_PROPNAME = "pingQueue";
+ private static final String PING_QUEUE_NAME_PROPNAME = "destinationname";
/**
* Holds the name of the property to get the test delivery mode from.
@@ -79,6 +77,8 @@ public class PingPongTestPerf extends AsymptoticTestCase //implements TimingCont
*/
private static final int MESSAGE_SIZE_DEFAULT = 0;
+ private static final int BATCH_SIZE_DEFAULT = 2;
+
/**
* Holds the name of the queue to which pings are sent.
*/
@@ -112,6 +112,11 @@ public class PingPongTestPerf extends AsymptoticTestCase //implements TimingCont
/** Holds the name of the property to get the message rate from. */
private static final String RATE_PROPNAME = "rate";
+ private static final String VERBOSE_OUTPUT_PROPNAME = "verbose";
+
+ /** Holds the true or false depending on wether it is P2P test or PubSub */
+ private static final String IS_PUBSUB_PROPNAME = "pubsub";
+
/** Holds the default rate. A value of zero means infinity, only values of 1 or greater are meaningfull. */
private static final int RATE_DEFAULT = 0;
@@ -126,6 +131,7 @@ public class PingPongTestPerf extends AsymptoticTestCase //implements TimingCont
* Thread local to hold the per-thread test setup fields.
*/
ThreadLocal<PerThreadSetup> threadSetup = new ThreadLocal<PerThreadSetup>();
+ Object _lock = new Object();
// Set up a property reader to extract the test parameters from. Once ContextualProperties is available in
// the project dependencies, use it to get property overrides for configurable tests and to notify the test runner
@@ -138,13 +144,16 @@ public class PingPongTestPerf extends AsymptoticTestCase //implements TimingCont
super(name);
// Sets up the test parameters with defaults.
+ setSystemPropertyIfNull(BATCH_SIZE, Integer.toString(BATCH_SIZE_DEFAULT));
setSystemPropertyIfNull(MESSAGE_SIZE_PROPNAME, Integer.toString(MESSAGE_SIZE_DEFAULT));
setSystemPropertyIfNull(PING_QUEUE_NAME_PROPNAME, PING_QUEUE_NAME_DEFAULT);
setSystemPropertyIfNull(PERSISTENT_MODE_PROPNAME, Boolean.toString(PERSISTENT_MODE_DEFAULT));
setSystemPropertyIfNull(TRANSACTED_PROPNAME, Boolean.toString(TRANSACTED_DEFAULT));
setSystemPropertyIfNull(BROKER_PROPNAME, BROKER_DEFAULT);
setSystemPropertyIfNull(VIRTUAL_PATH_PROPNAME, VIRTUAL_PATH_DEFAULT);
+ setSystemPropertyIfNull(VERBOSE_OUTPUT_PROPNAME, Boolean.toString(false));
setSystemPropertyIfNull(RATE_PROPNAME, Integer.toString(RATE_DEFAULT));
+ setSystemPropertyIfNull(IS_PUBSUB_PROPNAME, Boolean.toString(false));
}
/**
@@ -176,7 +185,7 @@ public class PingPongTestPerf extends AsymptoticTestCase //implements TimingCont
// Generate a sample message. This message is already time stamped and has its reply-to destination set.
ObjectMessage msg =
- perThreadSetup._testPingProducer.getTestMessage(perThreadSetup._testPingProducer.getReplyQueue(),
+ perThreadSetup._testPingProducer.getTestMessage(perThreadSetup._testPingProducer.getReplyDestination(),
Integer.parseInt(testParameters.getProperty(
MESSAGE_SIZE_PROPNAME)),
Boolean.parseBoolean(testParameters.getProperty(
@@ -217,9 +226,10 @@ public class PingPongTestPerf extends AsymptoticTestCase //implements TimingCont
boolean persistent = Boolean.parseBoolean(testParameters.getProperty(PERSISTENT_MODE_PROPNAME));
boolean transacted = Boolean.parseBoolean(testParameters.getProperty(TRANSACTED_PROPNAME));
String selector = null;
- boolean verbose = false;
+ boolean verbose = Boolean.parseBoolean(testParameters.getProperty(VERBOSE_OUTPUT_PROPNAME));
int messageSize = Integer.parseInt(testParameters.getProperty(MESSAGE_SIZE_PROPNAME));
int rate = Integer.parseInt(testParameters.getProperty(RATE_PROPNAME));
+ boolean pubsub = Boolean.parseBoolean(testParameters.getProperty(IS_PUBSUB_PROPNAME));
boolean afterCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_COMMIT));
boolean beforeCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_COMMIT));
@@ -228,20 +238,23 @@ public class PingPongTestPerf extends AsymptoticTestCase //implements TimingCont
int batchSize = Integer.parseInt(testParameters.getProperty(BATCH_SIZE));
Boolean failOnce = Boolean.parseBoolean(testParameters.getProperty(FAIL_ONCE));
- // Establish a bounce back client on the ping queue to bounce back the pings.
- perThreadSetup._testPingBouncer = new PingPongBouncer(brokerDetails, username, password, virtualpath, queueName,
- persistent, transacted, selector, verbose);
+ synchronized(_lock)
+ {
+ // Establish a bounce back client on the ping queue to bounce back the pings.
+ perThreadSetup._testPingBouncer = new PingPongBouncer(brokerDetails, username, password, virtualpath,
+ queueName, persistent, transacted, selector, verbose, pubsub);
- // Start the connections for client and producer running.
- perThreadSetup._testPingBouncer.getConnection().start();
+ // Start the connections for client and producer running.
+ perThreadSetup._testPingBouncer.getConnection().start();
- // Establish a ping-pong client on the ping queue to send the pings with.
- perThreadSetup._testPingProducer = new PingPongProducer(brokerDetails, username, password, virtualpath,
+ // Establish a ping-pong client on the ping queue to send the pings with.
+
+ perThreadSetup._testPingProducer = new PingPongProducer(brokerDetails, username, password, virtualpath,
queueName, selector, transacted, persistent, messageSize,
verbose, afterCommit, beforeCommit, afterSend,
- beforeSend, failOnce, batchSize, 0, rate);
-
- perThreadSetup._testPingProducer.getConnection().start();
+ beforeSend, failOnce, batchSize, 0, rate, pubsub);
+ perThreadSetup._testPingProducer.getConnection().start();
+ }
// Attach the per-thread set to the thread.
threadSetup.set(perThreadSetup);