diff options
8 files changed, 99 insertions, 79 deletions
diff --git a/java/broker/etc/virtualhosts.xml b/java/broker/etc/virtualhosts.xml index 52ff23e090..c6dedd6433 100644 --- a/java/broker/etc/virtualhosts.xml +++ b/java/broker/etc/virtualhosts.xml @@ -69,9 +69,9 @@ <virtualhost> <name>development</name> <development> - <minimumAlertRepeatGap>30000</minimumAlertRepeatGap> - <maximumMessageCount>5000</maximumMessageCount> <queues> + <minimumAlertRepeatGap>30000</minimumAlertRepeatGap> + <maximumMessageCount>5000</maximumMessageCount> <queue> <name>queue</name> <queue> @@ -95,10 +95,10 @@ </virtualhost> <virtualhost> <name>test</name> - <test> - <minimumAlertRepeatGap>30000</minimumAlertRepeatGap> - <maximumMessageCount>5000</maximumMessageCount> + <test> <queues> + <minimumAlertRepeatGap>30000</minimumAlertRepeatGap> + <maximumMessageCount>5000</maximumMessageCount> <queue> <name>queue</name> <queue> diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 429829e201..5bbe1671a7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -632,7 +632,11 @@ public class AMQQueue implements Managable, Comparable protected void updateReceivedMessageCount(AMQMessage msg) throws AMQException { - _totalMessagesReceived.incrementAndGet(); + if (!msg.isRedelivered()) + { + _totalMessagesReceived.incrementAndGet(); + } + try { _managedObject.checkForNotification(msg); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java index c9329a244c..254348dba0 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java @@ -209,7 +209,8 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que public Long getMaximumQueueDepth() { - return _queue.getMaximumQueueDepth(); + long queueDepthInBytes = _queue.getMaximumQueueDepth(); + return queueDepthInBytes >> 10 ; } public void setMaximumQueueDepth(Long value) @@ -222,31 +223,11 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que */ public Long getQueueDepth() throws JMException { - return getQueueDepthKb(); - } - - public long getQueueDepthKb() - { long queueBytesSize = _queue.getQueueDepth(); return queueBytesSize >> 10 ; } /** - * returns size of message in bytes - */ - private long getMessageSize(AMQMessage msg) throws AMQException - { - if (msg == null) - { - return 0l; - } - - return msg.getContentHeaderBody().bodySize; - } - - - - /** * Checks if there is any notification to be send to the listeners */ public void checkForNotification(AMQMessage msg) throws AMQException, JMException diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java b/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java index 1a37f47b35..013bda5927 100644 --- a/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java +++ b/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java @@ -73,11 +73,12 @@ public class PingClient extends PingPongProducer public PingClient(String brokerDetails, String username, String password, String virtualpath, String destinationName,
String selector, boolean transacted, boolean persistent, int messageSize, boolean verbose,
boolean afterCommit, boolean beforeCommit, boolean afterSend, boolean beforeSend, boolean failOnce,
- int txBatchSize, int noOfDestinations, int rate, boolean pubsub, boolean unique) throws Exception
+ int txBatchSize, int noOfDestinations, int rate, boolean pubsub, boolean unique,
+ int ackMode, long pausetime) throws Exception
{
super(brokerDetails, username, password, virtualpath, destinationName, selector, transacted, persistent, messageSize,
verbose, afterCommit, beforeCommit, afterSend, beforeSend, failOnce, txBatchSize, noOfDestinations, rate,
- pubsub, unique);
+ pubsub, unique, ackMode, pausetime);
_pingClientCount++;
}
@@ -104,5 +105,4 @@ public class PingClient extends PingPongProducer return _pingClientCount;
}
}
-
}
diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java index 57d5c37fc6..f2fbd29314 100644 --- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java +++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java @@ -163,6 +163,10 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis public static final String UNIQUE_PROPNAME = "uniqueDests";
+ public static final String ACK_MODE_PROPNAME = "ackMode";
+
+ public static final String PAUSE_AFTER_BATCH_PROPNAME = "pausetimeAfterEachBatch";
+
/**
* Used to set up a default message size.
*/
@@ -285,6 +289,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis public static final boolean DEFAULT_UNIQUE = true;
+ public static final int DEFAULT_ACK_MODE = Session.NO_ACKNOWLEDGE;
+
/**
* Holds the name of the property to store nanosecond timestamps in ping messages with.
*/
@@ -325,6 +331,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis */
protected boolean _persistent;
+ private int _ackMode = Session.NO_ACKNOWLEDGE;
+
/**
* Determines what size of messages this producer sends.
*/
@@ -421,6 +429,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis */
protected int _txBatchSize = 1;
+ private static long _pausetimeAfterEachBatch = 0;
+
/**
* Holds the number of consumers that will be attached to each topic.
* Each pings will result in a reply from each of the attached clients
@@ -460,7 +470,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis String destinationName, String selector, boolean transacted, boolean persistent, int messageSize,
boolean verbose, boolean afterCommit, boolean beforeCommit, boolean afterSend,
boolean beforeSend, boolean failOnce, int txBatchSize, int noOfDestinations, int rate,
- boolean pubsub, boolean unique) throws Exception
+ boolean pubsub, boolean unique, int ackMode, long pause) throws Exception
{
_logger.debug("public PingPongProducer(String brokerDetails = " + brokerDetails + ", String username = " + username
+ ", String password = " + password + ", String virtualpath = " + virtualpath
@@ -470,7 +480,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis + afterCommit + ", boolean beforeCommit = " + beforeCommit + ", boolean afterSend = " + afterSend
+ ", boolean beforeSend = " + beforeSend + ", boolean failOnce = " + failOnce + ", int txBatchSize = "
+ txBatchSize + ", int noOfDestinations = " + noOfDestinations + ", int rate = " + rate
- + ", boolean pubsub = " + pubsub + ", boolean unique = " + unique + "): called");
+ + ", boolean pubsub = " + pubsub + ", boolean unique = " + unique
+ + ", ackMode = " + ackMode + "): called");
// Keep all the relevant options.
_persistent = persistent;
@@ -484,7 +495,12 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis _txBatchSize = txBatchSize;
_isPubSub = pubsub;
_isUnique = unique;
-
+ _pausetimeAfterEachBatch = pause;
+ if (ackMode != 0)
+ {
+ _ackMode = ackMode;
+ }
+
// Check that one or more destinations were specified.
if (noOfDestinations < 1)
{
@@ -498,8 +514,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis _connection = new AMQConnection(brokerDetails, username, password, clientID, virtualpath);
// Create transactional or non-transactional sessions, based on the command line arguments.
- _producerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE);
- _consumerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE);
+ _producerSession = (Session) getConnection().createSession(transacted, _ackMode);
+ _consumerSession = (Session) getConnection().createSession(transacted, _ackMode);
// Set up a throttle to control the send rate, if a rate > 0 is specified.
if (rate > 0)
@@ -537,7 +553,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis }
String brokerDetails = config.getHost() + ":" + config.getPort();
- String virtualpath = "test";
+ String virtualpath = DEFAULT_VIRTUAL_PATH;
String selector = (config.getSelector() == null) ? DEFAULT_SELECTOR : config.getSelector();
boolean verbose = true;
boolean transacted = config.isTransacted();
@@ -597,7 +613,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis PingPongProducer pingProducer =
new PingPongProducer(brokerDetails, DEFAULT_USERNAME, DEFAULT_PASSWORD, virtualpath, destName, selector,
transacted, persistent, messageSize, verbose, afterCommit, beforeCommit, afterSend,
- beforeSend, failOnce, batchSize, destCount, rate, pubsub, false);
+ beforeSend, failOnce, batchSize, destCount, rate, pubsub, false, 0, 0);
pingProducer.getConnection().start();
@@ -687,31 +703,31 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis {
AMQDestination destination;
- int id;
+ String id;
// Generate an id, unique within this pinger or to the whole JVM depending on the unique flag.
if (unique)
{
_logger.debug("Creating unique destinations.");
- id = _queueJVMSequenceID.incrementAndGet();
+ id = "_" + _queueJVMSequenceID.incrementAndGet() + "_" + _connection.getClientID();
}
else
{
_logger.debug("Creating shared destinations.");
- id = _queueSharedId.incrementAndGet();
+ id = "_" + _queueSharedId.incrementAndGet();
}
// Check if this is a pub/sub pinger, in which case create topics.
if (_isPubSub)
{
- _logger.debug("Creating topics.");
destination = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, rootName + id);
+ _logger.debug("Created topic " + destination);
}
// Otherwise this is a p2p pinger, in which case create queues.
else
{
- _logger.debug("Creating queues.");
destination = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, rootName + id);
+ _logger.debug("Created queue " + destination);
}
// Keep the destination.
@@ -834,6 +850,12 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis return pingAndWaitForReply(message, numPings, timeout, messageCorrelationId);
}
+ public int pingAndWaitForReply(int numPings, long timeout, String messageCorrelationId)
+ throws JMSException, InterruptedException
+ {
+ return pingAndWaitForReply(null, numPings, timeout, messageCorrelationId);
+ }
+
/**
* Sends the specified number of ping message and then waits for all correlating replies. If the wait times out
* before a reply arrives, then a null reply is returned from this method. This method allows the caller to specify
@@ -936,6 +958,11 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis _logger.debug("public void pingNoWaitForReply(Message message, int numPings = " + numPings
+ ", String messageCorrelationId = " + messageCorrelationId + "): called");
+ if (message == null)
+ {
+ message = getTestMessage(getReplyDestinations().get(0), _messageSize, _persistent);
+ }
+
message.setJMSCorrelationID(messageCorrelationId);
// Set up a committed flag to detect uncommitted messages at the end of the send loop. This may occurr if the
@@ -967,6 +994,11 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis {
commitTx(_producerSession);
committed = true;
+ /* This pause is required for some cases. eg in load testing when sessions are non-transacted the
+ Mina IO layer can't clear the cache in time. So this pause gives enough time for mina to clear
+ the cache (without this mina throws OutOfMemoryError). pause() will check if time is != 0
+ */
+ pause(_pausetimeAfterEachBatch);
}
// Spew out per message timings on every message sonly in verbose mode.
@@ -1013,10 +1045,10 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis }
}
- /*public Destination getReplyDestination()
+ public Destination getReplyDestination()
{
- return _replyDestination;
- }*/
+ return getReplyDestinations().get(0);
+ }
/**
* Sets a chained message listener. The message listener on this pinger, chains all its messages to the one set
@@ -1203,7 +1235,9 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis doFailover();
}
+ long l = System.currentTimeMillis();
session.commit();
+ _logger.debug("Time taken to commit :" + (System.currentTimeMillis() - l) + " ms" );
if (_failAfterCommit)
{
diff --git a/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java index 7fd215d410..0483a6f20a 100644 --- a/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java +++ b/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java @@ -70,7 +70,7 @@ public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerA /** Holds test specifics by correlation id. This consists of the expected number of messages and the timing controler. */ private Map<String, PerCorrelationId> perCorrelationIds = - Collections.synchronizedMap(new HashMap<String, PerCorrelationId>()); + Collections.synchronizedMap(new HashMap<String, PerCorrelationId>()); /** Holds the batched results listener, that does logging on batch boundaries. */ private BatchedResultsListener batchedResultsListener = null; @@ -86,12 +86,12 @@ public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerA // Sets up the test parameters with defaults. testParameters.setPropertyIfNull(TEST_RESULTS_BATCH_SIZE_PROPNAME, - Integer.toString(DEFAULT_TEST_RESULTS_BATCH_SIZE)); + Integer.toString(DEFAULT_TEST_RESULTS_BATCH_SIZE)); } /** * Compile all the tests into a test suite. - * @return The test suite to run. Should only contain testAsyncPingOk method. + * @return The test suite to run. Should only contain testAsyncPingOk method. */ public static Test suite() { @@ -129,7 +129,7 @@ public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerA * all replies have been received or a time out occurs before exiting this method. * * @param numPings The number of pings to send. - * @throws Exception pass all errors out to the test harness + * @throws Exception pass all errors out to the test harness */ public void testAsyncPingOk(int numPings) throws Exception { @@ -146,7 +146,8 @@ public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerA PingClient pingClient = perThreadSetup._pingClient; // Advance the correlation id of messages to send, to make it unique for this run. - String messageCorrelationId = Long.toString(corellationIdGenerator.incrementAndGet()); + perThreadSetup._correlationId = Long.toString(corellationIdGenerator.incrementAndGet()); + String messageCorrelationId = perThreadSetup._correlationId; _logger.debug("messageCorrelationId = " + messageCorrelationId); // Initialize the count and timing controller for the new correlation id. @@ -154,33 +155,20 @@ public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerA TimingController tc = getTimingController().getControllerForCurrentThread(); perCorrelationId._tc = tc; perCorrelationId._expectedCount = pingClient.getExpectedNumPings(numPings); - perCorrelationIds.put(messageCorrelationId, perCorrelationId); - - // Attach the chained message listener to the ping producer to listen asynchronously for the replies to these - // messages. - //pingClient.setChainedMessageListener(batchedResultsListener); - - // Generate a sample message of the specified size. - ObjectMessage msg = - pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0), - testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME), - testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME)); + perCorrelationIds.put(perThreadSetup._correlationId, perCorrelationId); // Send the requested number of messages, and wait until they have all been received. long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME)); - int numReplies = pingClient.pingAndWaitForReply(msg, numPings, timeout, messageCorrelationId); + int numReplies = pingClient.pingAndWaitForReply(numPings, timeout, perThreadSetup._correlationId); // Check that all the replies were received and log a fail if they were not. if (numReplies < perCorrelationId._expectedCount) { - tc.completeTest(false, numPings - perCorrelationId._expectedCount); + perCorrelationId._tc.completeTest(false, numPings - perCorrelationId._expectedCount); } - // Remove the chained message listener from the ping producer. - //pingClient.removeChainedMessageListener(); - // Remove the expected count and timing controller for the message correlation id, to ensure they are cleaned up. - perCorrelationIds.remove(messageCorrelationId); + perCorrelationIds.remove(perThreadSetup._correlationId); } /** @@ -258,9 +246,9 @@ public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerA // Extract the correlation id from the message. String correlationId = message.getJMSCorrelationID(); - _logger.debug("public void onMessage(Message message, int remainingCount = " + remainingCount - + "): called on batch boundary for message id: " + correlationId + " with thread id: " - + Thread.currentThread().getId()); + _logger.debug("public void onMessage(Message message, int remainingCount = " + remainingCount + + "): called on batch boundary for message id: " + correlationId + + " with thread id: " + Thread.currentThread().getId()); // Get the details for the correlation id and check that they are not null. They can become null // if a test times out. diff --git a/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java index 2057f09bc7..92acd18a38 100644 --- a/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java +++ b/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java @@ -66,7 +66,7 @@ public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware ThreadLocal<PerThreadSetup> threadSetup = new ThreadLocal<PerThreadSetup>();
/** Holds a property reader to extract the test parameters from. */
- protected ParsedProperties testParameters = new TestContextProperties(System.getProperties());
+ protected ParsedProperties testParameters = new ParsedProperties(System.getProperties());
public PingTestPerf(String name)
{
@@ -107,6 +107,9 @@ public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware PingPongProducer.DEFAULT_FAIL_BEFORE_SEND);
testParameters.setPropertyIfNull(PingPongProducer.FAIL_ONCE_PROPNAME, PingPongProducer.DEFAULT_FAIL_ONCE);
testParameters.setPropertyIfNull(PingPongProducer.UNIQUE_PROPNAME, PingPongProducer.DEFAULT_UNIQUE);
+ testParameters.setSysPropertyIfNull(PingPongProducer.ACK_MODE_PROPNAME,
+ Integer.toString(PingPongProducer.DEFAULT_ACK_MODE));
+ testParameters.setSysPropertyIfNull(PingPongProducer.PAUSE_AFTER_BATCH_PROPNAME, 0l);
}
/**
@@ -141,11 +144,11 @@ public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware // Generate a sample message. This message is already time stamped and has its reply-to destination set.
ObjectMessage msg =
- perThreadSetup._pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0),
- testParameters.getPropertyAsInteger(
- PingPongProducer.MESSAGE_SIZE_PROPNAME),
- testParameters.getPropertyAsBoolean(
- PingPongProducer.PERSISTENT_MODE_PROPNAME));
+ perThreadSetup._pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0),
+ testParameters.getPropertyAsInteger(
+ PingPongProducer.MESSAGE_SIZE_PROPNAME),
+ testParameters.getPropertyAsBoolean(
+ PingPongProducer.PERSISTENT_MODE_PROPNAME));
// start the test
long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME));
@@ -190,10 +193,12 @@ public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware int batchSize = testParameters.getPropertyAsInteger(PingPongProducer.COMMIT_BATCH_SIZE_PROPNAME);
Boolean failOnce = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_ONCE_PROPNAME);
boolean unique = testParameters.getPropertyAsBoolean(PingPongProducer.UNIQUE_PROPNAME);
+ int ackMode = testParameters.getPropertyAsInteger(PingPongProducer.ACK_MODE_PROPNAME);
+ int pausetime = testParameters.getPropertyAsInteger(PingPongProducer.PAUSE_AFTER_BATCH_PROPNAME);
// Extract the test set up paramaeters.
int destinationscount =
- Integer.parseInt(testParameters.getProperty(PingPongProducer.PING_DESTINATION_COUNT_PROPNAME));
+ Integer.parseInt(testParameters.getProperty(PingPongProducer.PING_DESTINATION_COUNT_PROPNAME));
// This is synchronized because there is a race condition, which causes one connection to sleep if
// all threads try to create connection concurrently.
@@ -203,7 +208,8 @@ public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware perThreadSetup._pingClient = new PingClient(brokerDetails, username, password, virtualPath, destinationName,
selector, transacted, persistent, messageSize, verbose,
failAfterCommit, failBeforeCommit, failAfterSend, failBeforeSend,
- failOnce, batchSize, destinationscount, rate, pubsub, unique);
+ failOnce, batchSize, destinationscount, rate, pubsub,
+ unique, ackMode, pausetime);
}
// Start the client connection
perThreadSetup._pingClient.getConnection().start();
@@ -255,5 +261,6 @@ public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware * Holds the test ping client.
*/
protected PingClient _pingClient;
+ protected String _correlationId;
}
}
diff --git a/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java index 3a89b1044e..d2e0367bba 100644 --- a/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java +++ b/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java @@ -111,6 +111,9 @@ public class PingPongTestPerf extends AsymptoticTestCase PingPongProducer.DEFAULT_FAIL_BEFORE_SEND);
ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_ONCE_PROPNAME, PingPongProducer.DEFAULT_FAIL_ONCE);
ParsedProperties.setSysPropertyIfNull(PingPongProducer.UNIQUE_PROPNAME, Boolean.toString(PingPongProducer.DEFAULT_UNIQUE));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.ACK_MODE_PROPNAME,
+ Integer.toString(PingPongProducer.DEFAULT_ACK_MODE));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.PAUSE_AFTER_BATCH_PROPNAME, 0l);
}
/**
@@ -188,6 +191,8 @@ public class PingPongTestPerf extends AsymptoticTestCase int batchSize = testParameters.getPropertyAsInteger(PingPongProducer.COMMIT_BATCH_SIZE_PROPNAME);
Boolean failOnce = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_ONCE_PROPNAME);
boolean unique = testParameters.getPropertyAsBoolean(PingPongProducer.UNIQUE_PROPNAME);
+ int ackMode = testParameters.getPropertyAsInteger(PingPongProducer.ACK_MODE_PROPNAME);
+ long pause = testParameters.getPropertyAsInteger(PingPongProducer.PAUSE_AFTER_BATCH_PROPNAME);
synchronized (this)
{
@@ -205,7 +210,8 @@ public class PingPongTestPerf extends AsymptoticTestCase destinationName, selector, transacted, persistent,
messageSize, verbose, failAfterCommit,
failBeforeCommit, failAfterSend, failBeforeSend,
- failOnce, batchSize, 0, rate, pubsub, unique);
+ failOnce, batchSize, 0, rate, pubsub,
+ unique, ackMode, pause);
perThreadSetup._testPingProducer.getConnection().start();
}
|