summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBhupendra Bhusman Bhardwaj <bhupendrab@apache.org>2007-03-01 17:57:24 +0000
committerBhupendra Bhusman Bhardwaj <bhupendrab@apache.org>2007-03-01 17:57:24 +0000
commit0b58d3664c44f5828db58c914718d5a46ef0336f (patch)
tree5832a7dcffe804799dbba8a0cefe1b392a7eabd9
parent5d2bd6704be7f9fcc010245a957f6798ea00ec0d (diff)
downloadqpid-python-0b58d3664c44f5828db58c914718d5a46ef0336f.tar.gz
QPID-395 : pause between batches of messages
QPID-367 : config updation for message alerts git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/perftesting@513419 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/etc/virtualhosts.xml58
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java12
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java3
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java4
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java38
-rw-r--r--qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java16
-rw-r--r--qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java5
-rw-r--r--qpid/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java4
8 files changed, 80 insertions, 60 deletions
diff --git a/qpid/java/broker/etc/virtualhosts.xml b/qpid/java/broker/etc/virtualhosts.xml
index 48aa492421..48442d7980 100644
--- a/qpid/java/broker/etc/virtualhosts.xml
+++ b/qpid/java/broker/etc/virtualhosts.xml
@@ -22,10 +22,12 @@
<virtualhosts>
<virtualhost>
<name>localhost</name>
-
<localhost>
<minimumAlertRepeatGap>30000</minimumAlertRepeatGap>
<maximumMessageCount>5000</maximumMessageCount>
+ <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb -->
+ <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
+ <maximumMessageAge>600000</maximumMessageAge>
<queue>
<name>queue</name>
<queue>
@@ -47,11 +49,13 @@
</localhost>
</virtualhost>
<virtualhost>
- <name>development</name>
-
+ <name>development</name>
<development>
- <minimumAlertRepeatGap>30000</minimumAlertRepeatGap>
- <maximumMessageCount>5000</maximumMessageCount>
+ <minimumAlertRepeatGap>30000</minimumAlertRepeatGap>
+ <maximumMessageCount>5000</maximumMessageCount>
+ <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb -->
+ <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
+ <maximumMessageAge>600000</maximumMessageAge>
<queue>
<name>queue</name>
<queue>
@@ -72,30 +76,32 @@
</queue>
</development>
</virtualhost>
- <virtualhost>
- <name>test</name>
-
- <test>
+ <virtualhost>
+ <name>test</name>
+ <test>
<minimumAlertRepeatGap>30000</minimumAlertRepeatGap>
<maximumMessageCount>5000</maximumMessageCount>
+ <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb -->
+ <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
+ <maximumMessageAge>600000</maximumMessageAge>
+ <queue>
+ <name>queue</name>
<queue>
- <name>queue</name>
- <queue>
- <exchange>amq.direct</exchange>
- <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb -->
- <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
- <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins -->
- </queue>
- </queue>
- <queue>
- <name>ping</name>
- <ping>
- <exchange>amq.direct</exchange>
- <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb -->
- <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
- <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins -->
- </ping>
+ <exchange>amq.direct</exchange>
+ <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb -->
+ <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
+ <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins -->
</queue>
- </test>
+ </queue>
+ <queue>
+ <name>ping</name>
+ <ping>
+ <exchange>amq.direct</exchange>
+ <maximumQueueDepth>22352</maximumQueueDepth> <!-- 4Mb -->
+ <maximumMessageSize>11176</maximumMessageSize> <!-- 2Mb -->
+ <maximumMessageAge>6000</maximumMessageAge> <!-- 10 mins -->
+ </ping>
+ </queue>
+ </test>
</virtualhost>
</virtualhosts>
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 06467277c6..ae4fb125b8 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -79,7 +79,7 @@ public class AMQQueue implements Managable, Comparable
/** max allowed number of messages on a queue. */
@Configured(path = "maximumMessageCount", defaultValue = "0")
- public int _maximumMessageCount;
+ public int _maximumMessageCount = 5000;
/** max queue depth for the queue */
@Configured(path = "maximumQueueDepth", defaultValue = "0")
@@ -89,7 +89,7 @@ public class AMQQueue implements Managable, Comparable
* maximum message age before alerts occur
*/
@Configured(path = "maximumMessageAge", defaultValue = "0")
- public long _maximumMessageAge = 30000; //0
+ public long _maximumMessageAge = 30000;
/*
* the minimum interval between sending out consequetive alerts of the same type
@@ -506,10 +506,10 @@ public class AMQQueue implements Managable, Comparable
protected void updateReceivedMessageCount(AMQMessage msg)
{
- if (msg.isRedelivered())
- return;
-
- _totalMessagesReceived++;
+ if (!msg.isRedelivered())
+ {
+ _totalMessagesReceived++;
+ }
_managedObject.checkForNotification(msg);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
index ec9aad05a9..9657fcfcb3 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
@@ -177,7 +177,8 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
public Long getMaximumQueueDepth()
{
- return _queue.getMaximumQueueDepth();
+ long queueDepthInBytes = _queue.getMaximumQueueDepth();
+ return queueDepthInBytes >> 10;
}
public void setMaximumQueueDepth(Long value)
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java
index 2a384c5a84..3aedad311c 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java
@@ -74,11 +74,11 @@ public class PingClient extends PingPongProducer
String selector, boolean transacted, boolean persistent, int messageSize, boolean verbose,
boolean afterCommit, boolean beforeCommit, boolean afterSend, boolean beforeSend, boolean failOnce,
int txBatchSize, int noOfDestinations, int rate, boolean pubsub, boolean unique,
- int ackMode) throws Exception
+ int ackMode, long pausetime) throws Exception
{
super(brokerDetails, username, password, virtualpath, destinationName, selector, transacted, persistent, messageSize,
verbose, afterCommit, beforeCommit, afterSend, beforeSend, failOnce, txBatchSize, noOfDestinations, rate,
- pubsub, unique, ackMode);
+ pubsub, unique, ackMode, pausetime);
_pingClientCount++;
}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
index ce581a6582..26e5ee4f64 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
@@ -146,6 +146,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
public static final String ACK_MODE_PROPNAME = "ackMode";
+ public static final String PAUSE_AFTER_BATCH_PROPNAME = "pausetimeAfterEachBatch";
+
/** Used to set up a default message size. */
public static final int DEFAULT_MESSAGE_SIZE = 0;
@@ -186,7 +188,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
public static final String DEFAULT_BROKER = "tcp://localhost:5672";
/** Holds the default virtual path for the test. */
- public static final String DEFAULT_VIRTUAL_PATH = "test";
+ public static final String DEFAULT_VIRTUAL_PATH = "/test";
/** Holds the pub/sub mode default, true means ping a topic, false means ping a queue. */
public static final boolean DEFAULT_PUBSUB = false;
@@ -314,6 +316,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
/** Holds the number of sends that should be performed in every transaction when using transactions. */
protected int _txBatchSize = 1;
+ private static long _pausetimeAfterEachBatch = 0;
+
/**
* Holds the number of consumers that will be attached to each topic.
* Each pings will result in a reply from each of the attached clients
@@ -353,7 +357,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
String destinationName, String selector, boolean transacted, boolean persistent, int messageSize,
boolean verbose, boolean afterCommit, boolean beforeCommit, boolean afterSend,
boolean beforeSend, boolean failOnce, int txBatchSize, int noOfDestinations, int rate,
- boolean pubsub, boolean unique, int ackMode) throws Exception
+ boolean pubsub, boolean unique, int ackMode, long pause) throws Exception
{
_logger.debug("public PingPongProducer(String brokerDetails = " + brokerDetails + ", String username = " + username
+ ", String password = " + password + ", String virtualpath = " + virtualpath
@@ -378,6 +382,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
_txBatchSize = txBatchSize;
_isPubSub = pubsub;
_isUnique = unique;
+ _pausetimeAfterEachBatch = pause;
if (ackMode != 0)
{
_ackMode = ackMode;
@@ -435,7 +440,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
}
String brokerDetails = config.getHost() + ":" + config.getPort();
- String virtualpath = "/test";
+ String virtualpath = DEFAULT_VIRTUAL_PATH;
String selector = (config.getSelector() == null) ? DEFAULT_SELECTOR : config.getSelector();
boolean verbose = true;
boolean transacted = config.isTransacted();
@@ -495,7 +500,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
PingPongProducer pingProducer =
new PingPongProducer(brokerDetails, DEFAULT_USERNAME, DEFAULT_PASSWORD, virtualpath, destName, selector,
transacted, persistent, messageSize, verbose, afterCommit, beforeCommit, afterSend,
- beforeSend, failOnce, batchSize, destCount, rate, pubsub, false, 0);
+ beforeSend, failOnce, batchSize, destCount, rate, pubsub, false, 0, 0);
pingProducer.getConnection().start();
@@ -732,6 +737,12 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
return pingAndWaitForReply(message, numPings, timeout, messageCorrelationId);
}
+ public int pingAndWaitForReply(int numPings, long timeout, String messageCorrelationId)
+ throws JMSException, InterruptedException
+ {
+ return pingAndWaitForReply(null, numPings, timeout, messageCorrelationId);
+ }
+
/**
* Sends the specified number of ping message and then waits for all correlating replies. If the wait times out
* before a reply arrives, then a null reply is returned from this method. This method allows the caller to specify
@@ -834,6 +845,11 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
_logger.debug("public void pingNoWaitForReply(Message message, int numPings = " + numPings
+ ", String messageCorrelationId = " + messageCorrelationId + "): called");
+ if (message == null)
+ {
+ message = getTestMessage(getReplyDestinations().get(0), _messageSize, _persistent);
+ }
+
message.setJMSCorrelationID(messageCorrelationId);
// Set up a committed flag to detect uncommitted messages at the end of the send loop. This may occurr if the
@@ -865,6 +881,12 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
{
commitTx(_producerSession);
committed = true;
+ /* This pause is required for some cases. eg in load testing when sessions are non-transacted the
+ Mina IO layer can't clear the cache in time. So this pause gives enough time for mina to clear
+ the cache (without this mina throws OutOfMemoryError). pause() will check if time is != 0
+ */
+ pause(_pausetimeAfterEachBatch);
+ //_logger.info("committed " + _txBatchSize + " " + i);
}
// Spew out per message timings on every message sonly in verbose mode.
@@ -909,10 +931,10 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
}
}
- /*public Destination getReplyDestination()
+ public Destination getReplyDestination()
{
- return _replyDestination;
- }*/
+ return getReplyDestinations().get(0);
+ }
/**
* Sets a chained message listener. The message listener on this pinger, chains all its messages to the one set
@@ -1095,7 +1117,9 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
doFailover();
}
+ long l = System.currentTimeMillis();
session.commit();
+ _logger.debug("Time taken to commit :" + (System.currentTimeMillis() - l) + " ms" );
if (_failAfterCommit)
{
diff --git a/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java b/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
index e0ce2b48b2..07a27f6031 100644
--- a/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
+++ b/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
@@ -158,21 +158,10 @@ public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerA
perCorrelationId._tc = tc;
perCorrelationId._expectedCount = pingClient.getExpectedNumPings(numPings);
perCorrelationIds.put(perThreadSetup._correlationId, perCorrelationId);
-
- // Attach the chained message listener to the ping producer to listen asynchronously for the replies to these
- // messages.
- //pingClient.setChainedMessageListener(batchedResultsListener);
-
- // Generate a sample message of the specified size.
- perThreadSetup._message =
- pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0),
- testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME),
- testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME));
-
// Send the requested number of messages, and wait until they have all been received.
long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME));
- int numReplies = pingClient.pingAndWaitForReply(perThreadSetup._message, numPings, timeout, perThreadSetup._correlationId);
+ int numReplies = pingClient.pingAndWaitForReply(numPings, timeout, perThreadSetup._correlationId);
// Check that all the replies were received and log a fail if they were not.
if (numReplies < perCorrelationId._expectedCount)
@@ -180,9 +169,6 @@ public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerA
perCorrelationId._tc.completeTest(false, numPings - perCorrelationId._expectedCount);
}
- // Remove the chained message listener from the ping producer.
- //pingClient.removeChainedMessageListener();
-
// Remove the expected count and timing controller for the message correlation id, to ensure they are cleaned up.
perCorrelationIds.remove(perThreadSetup._correlationId);
}
diff --git a/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java b/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
index ecab3876ee..21f59f74f4 100644
--- a/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
+++ b/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
@@ -108,6 +108,7 @@ public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware
testParameters.setPropertyIfNull(PingPongProducer.UNIQUE_PROPNAME, PingPongProducer.DEFAULT_UNIQUE);
testParameters.setSysPropertyIfNull(PingPongProducer.ACK_MODE_PROPNAME,
Integer.toString(PingPongProducer.DEFAULT_ACK_MODE));
+ testParameters.setSysPropertyIfNull(PingPongProducer.PAUSE_AFTER_BATCH_PROPNAME, 0l);
}
/**
@@ -192,6 +193,7 @@ public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware
Boolean failOnce = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_ONCE_PROPNAME);
boolean unique = testParameters.getPropertyAsBoolean(PingPongProducer.UNIQUE_PROPNAME);
int ackMode = testParameters.getPropertyAsInteger(PingPongProducer.ACK_MODE_PROPNAME);
+ int pausetime = testParameters.getPropertyAsInteger(PingPongProducer.PAUSE_AFTER_BATCH_PROPNAME);
// Extract the test set up paramaeters.
int destinationscount =
@@ -206,7 +208,7 @@ public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware
selector, transacted, persistent, messageSize, verbose,
failAfterCommit, failBeforeCommit, failAfterSend, failBeforeSend,
failOnce, batchSize, destinationscount, rate, pubsub,
- unique, ackMode);
+ unique, ackMode, pausetime);
}
// Start the client connection
perThreadSetup._pingClient.getConnection().start();
@@ -256,7 +258,6 @@ public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware
* Holds the test ping client.
*/
protected PingClient _pingClient;
- protected Message _message;
protected String _correlationId;
}
}
diff --git a/qpid/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java b/qpid/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java
index 4121811b8e..32852d2868 100644
--- a/qpid/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java
+++ b/qpid/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java
@@ -113,6 +113,7 @@ public class PingPongTestPerf extends AsymptoticTestCase
ParsedProperties.setSysPropertyIfNull(PingPongProducer.UNIQUE_PROPNAME, Boolean.toString(PingPongProducer.DEFAULT_UNIQUE));
ParsedProperties.setSysPropertyIfNull(PingPongProducer.ACK_MODE_PROPNAME,
Integer.toString(PingPongProducer.DEFAULT_ACK_MODE));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.PAUSE_AFTER_BATCH_PROPNAME, 0l);
}
/**
@@ -191,6 +192,7 @@ public class PingPongTestPerf extends AsymptoticTestCase
Boolean failOnce = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_ONCE_PROPNAME);
boolean unique = testParameters.getPropertyAsBoolean(PingPongProducer.UNIQUE_PROPNAME);
int ackMode = testParameters.getPropertyAsInteger(PingPongProducer.ACK_MODE_PROPNAME);
+ long pause = testParameters.getPropertyAsInteger(PingPongProducer.PAUSE_AFTER_BATCH_PROPNAME);
synchronized (this)
{
@@ -209,7 +211,7 @@ public class PingPongTestPerf extends AsymptoticTestCase
messageSize, verbose, failAfterCommit,
failBeforeCommit, failAfterSend, failBeforeSend,
failOnce, batchSize, 0, rate, pubsub,
- unique, ackMode);
+ unique, ackMode, pause);
perThreadSetup._testPingProducer.getConnection().start();
}