summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-06-25 14:16:30 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-06-25 14:16:30 +0000
commit964e2b5b5d4c5492afce8ec7448010a7d418b537 (patch)
tree00d298d08b5db4968ad244bf23fd4f171ba710f9
parent0f0bb4d46355a335ddd4c97bf2b6d7af09fd8ca2 (diff)
downloadqpid-python-964e2b5b5d4c5492afce8ec7448010a7d418b537.tar.gz
Update to provide a SustainedTestCase, this sends batches of messages to the broker. The rate of publication is regulated by the average consume rate advertised by all connected clients.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@550509 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/integrationtests/pom.xml6
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/InteropClientTestCase.java9
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java62
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase1DummyRun.java5
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase2BasicP2P.java5
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java5
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java582
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java9
8 files changed, 506 insertions, 177 deletions
diff --git a/java/integrationtests/pom.xml b/java/integrationtests/pom.xml
index ca3f67c659..9ccd153f54 100644
--- a/java/integrationtests/pom.xml
+++ b/java/integrationtests/pom.xml
@@ -46,6 +46,12 @@
<artifactId>qpid-client</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>1.4.0</version>
+ </dependency>
+
<dependency>
<groupId>uk.co.thebadgerset</groupId>
<artifactId>junit-toolkit</artifactId>
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/InteropClientTestCase.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/InteropClientTestCase.java
index 9f769822de..37952d08c8 100644
--- a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/InteropClientTestCase.java
+++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/InteropClientTestCase.java
@@ -79,12 +79,19 @@ public interface InteropClientTestCase extends MessageListener
/**
* Performs the test case actions.
- *
+ * return from here when you have finished the test.. this will signal the controller that the test has ended.
* @throws JMSException Any JMSException resulting from reading the message are allowed to fall through.
*/
public void start() throws JMSException;
/**
+ * Gives notice of termination of the test case actions.
+ *
+ * @throws JMSException Any JMSException resulting from allowed to fall through.
+ */
+ public void terminate() throws JMSException, InterruptedException;
+
+ /**
* Gets a report on the actions performed by the test case in its assigned role.
*
* @param session The session to create the report message in.
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java
index 6cca23446f..a82b05e20f 100644
--- a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java
+++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java
@@ -20,23 +20,31 @@
*/
package org.apache.qpid.interop.testclient;
-import java.io.IOException;
-import java.util.*;
-
-import javax.jms.*;
-import javax.naming.Context;
-import javax.naming.InitialContext;
-import javax.naming.NamingException;
-
import org.apache.log4j.Logger;
-
import org.apache.qpid.interop.testclient.testcases.TestCase1DummyRun;
import org.apache.qpid.interop.testclient.testcases.TestCase2BasicP2P;
-import org.apache.qpid.interop.testclient.testcases.TestCase3BasicPubSub;
-import org.apache.qpid.util.ClasspathScanner;
import org.apache.qpid.util.CommandLineParser;
import org.apache.qpid.util.PropertiesUtils;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
/**
* Implements a test client as described in the interop testing spec
* (http://cwiki.apache.org/confluence/display/qpid/Interop+Testing+Specification). A test client is an agent that
@@ -201,7 +209,7 @@ public class TestClient implements MessageListener
}
// Open a connection to communicate with the coordinator on.
- _connection = createConnection(DEFAULT_CONNECTION_PROPS_RESOURCE, brokerUrl, virtualHost);
+ _connection = createConnection(DEFAULT_CONNECTION_PROPS_RESOURCE, clientName, brokerUrl, virtualHost);
session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -219,17 +227,21 @@ public class TestClient implements MessageListener
_connection.start();
}
+
+ public static Connection createConnection(String connectionPropsResource, String brokerUrl, String virtualHost)
+ {
+ return createConnection(connectionPropsResource, "clientID", brokerUrl, virtualHost);
+ }
+
/**
* Establishes a JMS connection using a properties file and qpids built in JNDI implementation. This is a simple
- * convenience method for code that does anticipate handling connection failures. All exceptions that indicate
- * that the connection has failed, are wrapped as rutime exceptions, preumably handled by a top level failure
- * handler.
- *
- * @todo Make username/password configurable. Allow multiple urls for fail over. Once it feels right, move it
- * to a Utils library class.
+ * convenience method for code that does anticipate handling connection failures. All exceptions that indicate that
+ * the connection has failed, are wrapped as rutime exceptions, preumably handled by a top level failure handler.
*
* @param connectionPropsResource The name of the connection properties file.
- * @param brokerUrl The broker url to connect to, <tt>null</tt> to use the default from the properties.
+ * @param clientID
+ * @param brokerUrl The broker url to connect to, <tt>null</tt> to use the default from the
+ * properties.
* @param virtualHost The virtual host to connectio to, <tt>null</tt> to use the default.
*
* @return A JMS conneciton.
@@ -237,7 +249,7 @@ public class TestClient implements MessageListener
* @todo Make username/password configurable. Allow multiple urls for fail over. Once it feels right, move it to a
* Utils library class.
*/
- public static Connection createConnection(String connectionPropsResource, String brokerUrl, String virtualHost)
+ public static Connection createConnection(String connectionPropsResource, String clientID, String brokerUrl, String virtualHost)
{
log.debug("public static Connection createConnection(String connectionPropsResource = " + connectionPropsResource
+ ", String brokerUrl = " + brokerUrl + ", String virtualHost = " + virtualHost + "): called");
@@ -251,7 +263,7 @@ public class TestClient implements MessageListener
if (brokerUrl != null)
{
String connectionString =
- "amqp://guest:guest/" + ((virtualHost != null) ? virtualHost : "") + "?brokerlist='" + brokerUrl + "'";
+ "amqp://guest:guest@" + clientID + "/" + ((virtualHost != null) ? virtualHost : "") + "?brokerlist='" + brokerUrl + "'";
connectionProps.setProperty(CONNECTION_PROPERTY, connectionString);
}
@@ -381,6 +393,14 @@ public class TestClient implements MessageListener
{
log.info("Received termination instruction from coordinator.");
+// try
+// {
+// currentTestCase.terminate();
+// }
+// catch (InterruptedException e)
+// {
+// //
+// }
// Is a cleaner shutdown needed?
_connection.close();
System.exit(0);
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase1DummyRun.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase1DummyRun.java
index 85b89172bb..5f257c0b36 100644
--- a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase1DummyRun.java
+++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase1DummyRun.java
@@ -74,6 +74,11 @@ public class TestCase1DummyRun implements InteropClientTestCase
// Do nothing.
}
+ public void terminate() throws JMSException
+ {
+ //todo
+ }
+
public Message getReport(Session session) throws JMSException
{
log.debug("public Message getReport(Session session): called");
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase2BasicP2P.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase2BasicP2P.java
index ea62b46451..ff56ee9b93 100644
--- a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase2BasicP2P.java
+++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase2BasicP2P.java
@@ -170,6 +170,11 @@ public class TestCase2BasicP2P implements InteropClientTestCase
}
}
+ public void terminate() throws JMSException
+ {
+ //todo
+ }
+
/**
* Gets a report on the actions performed by the test case in its assigned role.
*
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java
index 223c4916bf..7b35142c82 100644
--- a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java
+++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java
@@ -202,6 +202,11 @@ public class TestCase3BasicPubSub implements InteropClientTestCase
}
}
+ public void terminate() throws JMSException, InterruptedException
+ {
+ //todo
+ }
+
/**
* Gets a report on the actions performed by the test case in its assigned role.
*
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java b/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java
index cabe73e331..1597da6dba 100644
--- a/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java
+++ b/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java
@@ -38,6 +38,7 @@ import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
/**
* Implements test case 3, basic pub/sub. Sends/received a specified number of messages to a specified route on the
@@ -52,7 +53,10 @@ import java.util.Map;
public class SustainedTestClient extends TestCase3BasicPubSub implements ExceptionListener
{
/** Used for debugging. */
- private static final Logger log = Logger.getLogger(SustainedTestClient.class);
+ private static final Logger debugLog = Logger.getLogger(SustainedTestClient.class);
+
+ private static final Logger log = Logger.getLogger("SustainedTest");
+
/** The role to be played by the test. */
private Roles role;
@@ -83,9 +87,11 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
SustainedRateAdapter _rateAdapter;
/** */
- int updateInterval;
+ int _batchSize;
+
- private boolean _running = true;
+ private static final long TEN_MILLI_SEC = 10000000;
+ private static final long FIVE_MILLI_SEC = 5000000;
/**
* Should provide the name of the test case that this class implements. The exact names are defined in the interop
@@ -95,7 +101,7 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
*/
public String getName()
{
- log.debug("public String getName(): called");
+ debugLog.debug("public String getName(): called");
return "Perf_SustainedPubSub";
}
@@ -111,31 +117,34 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
*/
public void assignRole(Roles role, Message assignRoleMessage) throws JMSException
{
- log.debug("public void assignRole(Roles role = " + role + ", Message assignRoleMessage = " + assignRoleMessage
- + "): called");
+ debugLog.debug("public void assignRole(Roles role = " + role + ", Message assignRoleMessage = " + assignRoleMessage
+ + "): called");
// Take note of the role to be played.
this.role = role;
// Extract and retain the test parameters.
numReceivers = assignRoleMessage.getIntProperty("SUSTAINED_NUM_RECEIVERS");
- updateInterval = assignRoleMessage.getIntProperty("SUSTAINED_UPDATE_INTERVAL");
+ _batchSize = assignRoleMessage.getIntProperty("SUSTAINED_UPDATE_INTERVAL");
String sendKey = assignRoleMessage.getStringProperty("SUSTAINED_KEY");
String sendUpdateKey = assignRoleMessage.getStringProperty("SUSTAINED_UPDATE_KEY");
int ackMode = assignRoleMessage.getIntProperty("ACKNOWLEDGE_MODE");
- log.debug("numReceivers = " + numReceivers);
- log.debug("updateInterval = " + updateInterval);
- log.debug("ackMode = " + ackMode);
- log.debug("sendKey = " + sendKey);
- log.debug("sendUpdateKey = " + sendUpdateKey);
- log.debug("role = " + role);
+ if (debugLog.isDebugEnabled())
+ {
+ debugLog.debug("numReceivers = " + numReceivers);
+ debugLog.debug("_batchSize = " + _batchSize);
+ debugLog.debug("ackMode = " + ackMode);
+ debugLog.debug("sendKey = " + sendKey);
+ debugLog.debug("sendUpdateKey = " + sendUpdateKey);
+ debugLog.debug("role = " + role);
+ }
switch (role)
{
// Check if the sender role is being assigned, and set up a single message producer if so.
case SENDER:
- log.info("*********** Creating SENDER");
+ log.info("Creating Sender");
// Create a new connection to pass the test messages on.
connection = new Connection[1];
session = new Session[1];
@@ -164,7 +173,7 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
// Otherwise the receiver role is being assigned, so set this up to listen for messages on the required number
// of receiver connections.
case RECEIVER:
- log.info("*********** Creating RECEIVER");
+ log.info("Creating Receiver");
// Create the required number of receiver connections.
connection = new Connection[numReceivers];
session = new Session[numReceivers];
@@ -183,7 +192,7 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
MessageConsumer consumer = session[i].createConsumer(sendDestination);
- consumer.setMessageListener(new SustainedListener(TestClient.CLIENT_NAME + "-" + i, updateInterval, session[i], sendUpdateDestination));
+ consumer.setMessageListener(new SustainedListener(TestClient.CLIENT_NAME + "-" + i, _batchSize, session[i], sendUpdateDestination));
}
break;
@@ -196,29 +205,32 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
}
}
+
/** Performs the test case actions. */
public void start() throws JMSException
{
- log.debug("public void start(): called");
+ debugLog.debug("public void start(): called");
// Check that the sender role is being performed.
switch (role)
{
// Check if the sender role is being assigned, and set up a single message producer if so.
case SENDER:
- Message testMessage = session[0].createTextMessage("test");
-
-// for (int i = 0; i < numMessages; i++)
- while (_running)
- {
- producer.send(testMessage);
-
- _rateAdapter.sentMessage();
- }
+ _rateAdapter.run();
break;
case RECEIVER:
}
+
+ //return from here when you have finished the test.. this will signal the controller and
+ }
+
+ public void terminate() throws JMSException, InterruptedException
+ {
+ if (_rateAdapter != null)
+ {
+ _rateAdapter.stop();
+ }
}
/**
@@ -232,7 +244,7 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
*/
public Message getReport(Session session) throws JMSException
{
- log.debug("public Message getReport(Session session): called");
+ debugLog.debug("public Message getReport(Session session): called");
// Close the test connections.
for (int i = 0; i < connection.length; i++)
@@ -252,89 +264,100 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
if (linked != null)
{
- if (linked instanceof AMQNoRouteException)
+ if (debugLog.isDebugEnabled())
{
- log.warn("No route .");
+ debugLog.debug("Linked Exception:" + linked);
}
- else if (linked instanceof AMQNoConsumersException)
- {
- log.warn("No clients currently available for message:" + ((AMQNoConsumersException) linked).getUndeliveredMessage());
- }
- else
+ if ((linked instanceof AMQNoRouteException)
+ || (linked instanceof AMQNoConsumersException))
{
+ if (debugLog.isDebugEnabled())
+ {
+ if (linked instanceof AMQNoConsumersException)
+ {
+ debugLog.warn("No clients currently available for message:" + ((AMQNoConsumersException) linked).getUndeliveredMessage());
+ }
+ else
+ {
+ debugLog.warn("No route for message");
+ }
+ }
- log.warn("LinkedException:" + linked);
+ // Tell the rate adapter that there are no clients ready yet
+ _rateAdapter.NO_CLIENTS = true;
}
-
- _rateAdapter.NO_CLIENTS = true;
}
else
{
- log.warn("Exception:" + linked);
+ debugLog.warn("Exception:" + linked);
}
}
+ /**
+ * Inner class that listens for messages and sends a report for the time taken between receiving the 'start' and
+ * 'end' messages.
+ */
class SustainedListener implements MessageListener
{
- private int _received = 0;
- private int _updateInterval = 0;
- private Long _time;
+ /** Number of messages received */
+ private long _received = 0;
+ /** The number of messages in the batch */
+ private int _batchSize = 0;
+ /** Record of the when the 'start' messagse was sen */
+ private Long _startTime;
+ /** Message producer to use to send reports */
MessageProducer _updater;
+ /** Session to create the report message on */
Session _session;
+ /** Record of the client ID used for this SustainedListnener */
String _client;
- public SustainedListener(String clientname, int updateInterval, Session session, Destination sendDestination) throws JMSException
+ /**
+ * Main Constructor
+ *
+ * @param clientname The _client id used to identify this connection.
+ * @param batchSize The number of messages that are to be sent per batch. Note: This is not used to
+ * control the interval between sending reports.
+ * @param session The session used for communication.
+ * @param sendDestination The destination that update reports should be sent to.
+ *
+ * @throws JMSException My occur if creatingthe Producer fails
+ */
+ public SustainedListener(String clientname, int batchSize, Session session, Destination sendDestination) throws JMSException
{
- _updateInterval = updateInterval;
+ _batchSize = batchSize;
_client = clientname;
_session = session;
_updater = session.createProducer(sendDestination);
}
- public void setReportInterval(int reportInterval)
- {
- _updateInterval = reportInterval;
- _received = 0;
- }
-
public void onMessage(Message message)
{
- if (log.isDebugEnabled())
+ if (debugLog.isTraceEnabled())
{
- log.debug("Message " + _received + "received in listener");
+ debugLog.trace("Message " + _received + "received in listener");
}
+
if (message instanceof TextMessage)
{
-
try
{
- if (((TextMessage) message).getText().equals("test"))
+ _received++;
+ if (((TextMessage) message).getText().equals("start"))
{
- if (_received == 0)
- {
- _time = System.nanoTime();
- sendStatus(0, _received);
- }
-
- _received++;
-
- if (_received % _updateInterval == 0)
+ debugLog.info("Starting Batch");
+ _startTime = System.nanoTime();
+ }
+ else if (((TextMessage) message).getText().equals("end"))
+ {
+ if (_startTime != null)
{
- Long currentTime = System.nanoTime();
-
- try
- {
- sendStatus(currentTime - _time, _received);
- _time = currentTime;
- }
- catch (JMSException e)
- {
- log.error("Unable to send update.");
- }
+ long currentTime = System.nanoTime();
+ sendStatus(currentTime - _startTime, _received);
+ debugLog.info("End Batch");
}
-
}
}
catch (JMSException e)
@@ -342,37 +365,68 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
//ignore error
}
}
+
}
- private void sendStatus(long time, int received) throws JMSException
+ /**
+ * sendStatus creates and sends the report back to the publisher
+ *
+ * @param time taken for the the last batch
+ * @param received Total number of messages received.
+ *
+ * @throws JMSException if an error occurs during the send
+ */
+ private void sendStatus(long time, long received) throws JMSException
{
Message updateMessage = _session.createTextMessage("update");
- updateMessage.setStringProperty("CLIENT_ID", _client);
+ updateMessage.setStringProperty("CLIENT_ID", ":" + _client);
updateMessage.setStringProperty("CONTROL_TYPE", "UPDATE");
updateMessage.setLongProperty("RECEIVED", received);
updateMessage.setLongProperty("DURATION", time);
- log.info("**** SENDING **** CLIENT_ID:" + _client + " RECEIVED:" + received + " DURATION:" + time);
+ if (debugLog.isInfoEnabled())
+ {
+ debugLog.info("**** SENDING [" + received / _batchSize + "]**** "
+ + "CLIENT_ID:" + _client + " RECEIVED:" + received + " DURATION:" + time);
+ }
+
+ // Output on the main log.info the details of this batch
+ if (received / _batchSize % 10 == 0)
+ {
+ log.info("Sending Report [" + received / _batchSize + "] "
+ + "CLIENT_ID:" + _client + " RECEIVED:" + received + " DURATION:" + time);
+ }
_updater.send(updateMessage);
}
-
}
- class SustainedRateAdapter implements MessageListener
+
+ /**
+ * This class is used here to adjust the _delay value which in turn is used to control the number of messages/second
+ * that are sent through the test system.
+ *
+ * By keeping a record of the messages recevied and the average time taken to process the batch size can be
+ * calculated and so the delay can be adjusted to maintain that rate.
+ *
+ * Given that delays of < 10ms can be rounded up the delay is only used between messages if the _delay > 10ms * no
+ * messages in the batch. Otherwise the delay is used at the end of the batch.
+ */
+ class SustainedRateAdapter implements MessageListener, Runnable
{
private SustainedTestClient _client;
- private long _variance = 250; //no. messages to allow drifting
+ private long _messageVariance = 500; //no. messages to allow drifting
+ private long _timeVariance = TEN_MILLI_SEC * 5; // no. nanos between send and report delay (10ms)
private volatile long _delay; //in nanos
private long _sent;
private Map<String, Long> _slowClients = new HashMap<String, Long>();
- private static final long PAUSE_SLEEP = 10; // 10 ms
- private static final long NO_CLIENT_SLEEP = 1000; // 1s
- private static final long MAX_MESSAGE_DRIFT = 1000; // no messages drifted from producer
+ private static final long PAUSE_SLEEP = TEN_MILLI_SEC / 1000; // 10 ms
+ private static final long NO_CLIENT_SLEEP = 1000; // 1s
private volatile boolean NO_CLIENTS = true;
private int _delayShifting;
- private static final int REPORTS_WITHOUT_CHANGE = 10;
- private static final double MAXIMUM_DELAY_SHIFT = .02; //2%
+ private static final int REPORTS_WITHOUT_CHANGE = 5;
+ private boolean _warmedup = false;
+ private static final long EXPECTED_TIME_PER_BATCH = 100000L;
SustainedRateAdapter(SustainedTestClient client)
{
@@ -381,9 +435,9 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
public void onMessage(Message message)
{
- if (log.isDebugEnabled())
+ if (debugLog.isDebugEnabled())
{
- log.debug("SustainedRateAdapter onMessage(Message message = " + message + "): called");
+ debugLog.debug("SustainedRateAdapter onMessage(Message message = " + message + "): called");
}
try
@@ -395,15 +449,25 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
{
NO_CLIENTS = false;
long duration = message.getLongProperty("DURATION");
- long received = message.getLongProperty("RECEIVED");
+ long totalReceived = message.getLongProperty("RECEIVED");
String client = message.getStringProperty("CLIENT_ID");
- log.info("**** SENDING **** CLIENT_ID:" + client + " RECEIVED:" + received + " DURATION:" + duration);
+ if (debugLog.isInfoEnabled())
+ {
+ debugLog.info("Update Report: CLIENT_ID:" + client + " RECEIVED:" + totalReceived + " DURATION:" + duration);
+ }
+
+ recordSlow(client, totalReceived);
+ adjustDelay(client, totalReceived, duration);
- recordSlow(client, received);
- adjustDelay(client, received, duration);
+ if (!_warmedup && _totalReceived / _batchSize / delays.size() == _warmUpBatches / 2)
+ {
+ _warmedup = true;
+ _warmup.countDown();
+
+ }
}
}
catch (JMSException e)
@@ -412,72 +476,220 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
}
}
- class Pair<X, Y>
+ CountDownLatch _warmup = new CountDownLatch(1);
+
+ int _warmUpBatches = 20;
+
+ int _numBatches = 10000;
+
+ // long[] _timings = new long[_numBatches];
+ private boolean _running = true;
+
+
+ public void run()
+ {
+ log.info("Warming up");
+
+ doBatch(_warmUpBatches);
+
+ try
+ {
+ //wait for warmup to complete.
+ _warmup.await();
+
+ //set delay to the average length of the batches
+ _delay = _totalDuration / _warmUpBatches / delays.size();
+
+ log.info("Warmup complete delay set : " + _delay
+ + " based on _totalDuration: " + _totalDuration
+ + " over no. batches: " + _warmUpBatches
+ + " with client count: " + delays.size());
+
+ _totalDuration = 0L;
+ _totalReceived = 0L;
+ _sent = 0L;
+ }
+ catch (InterruptedException e)
+ {
+ //
+ }
+
+
+ doBatch(_numBatches);
+
+ }
+
+ private void doBatch(int batchSize) // long[] timings,
{
- X item1;
- Y item2;
+ TextMessage testMessage = null;
+ try
+ {
+ testMessage = _client.session[0].createTextMessage("start");
+
- Pair(X i1, Y i2)
+ for (int batch = 0; batch < batchSize; batch++)
+// while (_running)
+ {
+ long start = System.nanoTime();
+
+ testMessage.setText("start");
+ _client.producer.send(testMessage);
+ _rateAdapter.sentMessage();
+
+ testMessage.setText("test");
+ //start at 2 so start and end count as part of batch
+ for (int m = 2; m < _batchSize; m++)
+ {
+ _client.producer.send(testMessage);
+ _rateAdapter.sentMessage();
+ }
+
+ testMessage.setText("end");
+ _client.producer.send(testMessage);
+ _rateAdapter.sentMessage();
+
+ long end = System.nanoTime();
+
+ long sendtime = end - start;
+
+ debugLog.info("Sent batch[" + batch + "](" + _batchSize + ") in " + sendtime);//timings[batch]);
+
+ if (batch % 10 == 0)
+ {
+ log.info("Sent Batch[" + batch + "](" + _batchSize + ")" + status());
+ }
+
+ _rateAdapter.sleepBatch();
+
+ }
+ }
+ catch (JMSException e)
{
- item1 = i1;
- item2 = i2;
+ log.error("Runner ended");
}
+ }
- X getItem1()
+ private String status()
+ {
+ return " TotalDuration: " + _totalDuration + " for " + delays.size() + " consumers"
+ + " Delay is " + _delay + " resulting in "
+ + ((_delay > TEN_MILLI_SEC * _batchSize) ? (_delay / _batchSize) + "/msg" : _delay + "/batch");
+ }
+
+ private void sleepBatch()
+ {
+ if (checkForSlowClients())
+ {//if there werwe slow clients we have already slept so don't sleep anymore again.
+ return;
+ }
+
+ //Slow down if gap between send and received is too large
+ if (_sent - _totalReceived / delays.size() > _messageVariance)
{
- return item1;
+ //pause between batches.
+ debugLog.info("Sleeping to keep sent in check with received");
+ log.debug("Increaseing _delay as sending more than receiving");
+ _delay += TEN_MILLI_SEC;
}
- Y getItem2()
+ //per batch sleep.. if sleep is to small to spread over the batch.
+ if (_delay <= TEN_MILLI_SEC * _batchSize)
{
- return item2;
+ sleepLong(_delay);
+ }
+ else
+ {
+ debugLog.info("Not sleeping _delay > ten*batch is:" + _delay);
}
}
- Map<String, Pair<Long, Long>> delays = new HashMap<String, Pair<Long, Long>>();
- Long totalReceived = 0L;
- Long totalDuration = 0L;
+ public void stop()
+ {
+ _running = false;
+ }
- private void adjustDelay(String client, long received, long duration)
+ Map<String, Long> delays = new HashMap<String, Long>();
+ Long _totalReceived = 0L;
+ Long _totalDuration = 0L;
+ int _skipUpdate = 0;
+
+ /**
+ * Adjust the delay for sending messages based on this update from the client
+ *
+ * @param client The client that send this update
+ * @param totalReceived The number of messages that this client has received.
+ * @param duration The time taken for the last batch of messagse
+ */
+ private void adjustDelay(String client, long totalReceived, long duration)
{
- Pair<Long, Long> current = delays.get(client);
+ //Retrieve the current total time taken for this client.
+ Long currentTime = delays.get(client);
- if (current == null)
+ // Add the new duration time to this client
+ if (currentTime == null)
{
- delays.put(client, new Pair<Long, Long>(received, duration));
+ currentTime = duration;
}
else
{
- //reduce totals
- totalReceived -= current.getItem1();
- totalDuration -= current.getItem2();
+ currentTime += duration;
}
- totalReceived += received;
- totalDuration += duration;
+ delays.put(client, currentTime);
+
+
+ _totalReceived += _batchSize;
+ _totalDuration += duration;
- long averageDuration = totalDuration / delays.size();
+ // Calculate the number of messages in the batch.
+ long batchCount = (_totalReceived / _batchSize);
- long diff = Math.abs(_delay - averageDuration);
+ //calculate average duration accross clients per batch
+ long averageDuration = _totalDuration / delays.size() / batchCount;
+
+ //calculate the difference between current send delay and average report delay
+ long diff = (duration) - averageDuration;
+
+ if (debugLog.isInfoEnabled())
+ {
+ debugLog.info("TotalDuration:" + _totalDuration + " for " + delays.size() + " consumers"
+ + " on batch: " + batchCount
+ + " Batch Duration: " + duration
+ + " Average: " + averageDuration
+ + " so diff: " + diff + " for : " + client
+ + " Delay is " + _delay + " resulting in "
+ + ((_delay > TEN_MILLI_SEC * _batchSize)
+ ? (_delay / _batchSize) + "/msg" : _delay + "/batch"));
+ }
//if the averageDuration differs from the current by more than the specified variane then adjust delay.
- if (diff > _variance)
+ if (Math.abs(diff) > _timeVariance)
{
- if (averageDuration > _delay)
+
+ // if the the _delay is larger than the required duration to send report
+ // speed up
+ if (diff > TEN_MILLI_SEC)
{
- // we can go faster
- _delay -= diff;
+ _delay -= TEN_MILLI_SEC;
+
if (_delay < 0)
{
_delay = 0;
+ debugLog.info("Reset _delay to 0");
+ delayStable();
+ }
+ else
+ {
+ delayChanged();
}
+
}
- else
+ else if (diff < 0) // diff < 0 diff cannot be 0 as it is > _timeVariance
{
- // we need to slow down
- _delay += diff;
+ // the report took longer
+ _delay += TEN_MILLI_SEC;
+ delayChanged();
}
- delayChanged();
}
else
{
@@ -486,11 +698,16 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
}
+ /** Reset the number of iterations before we say the delay has stabilised. */
private void delayChanged()
{
_delayShifting = REPORTS_WITHOUT_CHANGE;
}
+ /**
+ * Record the fact that delay has stabilised If delay has stablised for REPORTS_WITHOUT_CHANGE then it will
+ * output Delay stabilised
+ */
private void delayStable()
{
_delayShifting--;
@@ -498,14 +715,20 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
if (_delayShifting < 0)
{
_delayShifting = 0;
- log.info("Delay stabilised:" + _delay);
+ log.debug("Delay stabilised:" + _delay);
}
}
- // Record Slow clients
+ /**
+ * Checks that the client has received enough messages. If the client has fallen behind then they are put in the
+ * _slowClients lists which will increase the delay.
+ *
+ * @param client The client identifier to check
+ * @param received the number of messages received by that client
+ */
private void recordSlow(String client, long received)
{
- if (received < (_sent - _variance))
+ if (received < (_sent - _messageVariance))
{
_slowClients.put(client, received);
}
@@ -515,20 +738,49 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
}
}
+ /** Incrment the number of sent messages and then sleep, if required. */
public void sentMessage()
{
- if (_sent % updateInterval == 0)
+
+ _sent++;
+
+ if (_delay > TEN_MILLI_SEC * _batchSize)
{
+ long batchDelay = _delay / _batchSize;
+ // less than 10ms sleep doesn't always work.
+ // _delay is in nano seconds
+// if (batchDelay < (TEN_MILLI_SEC))
+// {
+// sleep(0, (int) batchDelay);
+// }
+// else
+ {
+// if (batchDelay < 30000000000L)
+ {
+ sleepLong(batchDelay);
+ }
+ }
+ }
+ }
+
+ /**
+ * Check at the end of each batch and pause sending messages to allow slow clients to catch up.
+ *
+ * @return true if there were slow clients that caught up.
+ */
+ private boolean checkForSlowClients()
+ {
+ if (_sent % _batchSize == 0)
+ {
// Cause test to pause when we have slow
if (!_slowClients.isEmpty() || NO_CLIENTS)
{
- log.info("Pausing for slow clients");
-
- //_delay <<= 1;
+ debugLog.info("Pausing for slow clients:" + _slowClients.entrySet().toArray());
while (!_slowClients.isEmpty())
{
+ debugLog.info(_slowClients.size() + " slow clients.");
sleep(PAUSE_SLEEP);
}
@@ -537,45 +789,67 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
sleep(NO_CLIENT_SLEEP);
}
- log.debug("Continuing");
- return;
+ debugLog.debug("Continuing");
+ return true;
}
else
{
- log.info("Delay:" + _delay);
+ debugLog.info("Delay:" + _delay);
}
+
}
- _sent++;
+ return false;
+ }
- if (_delay > 0)
- {
- // less than 10ms sleep doesn't work.
- // _delay is in nano seconds
- if (_delay < 1000000)
- {
- sleep(0, (int) _delay);
- }
- else
- {
- if (_delay < 30000000000L)
- {
- sleep(_delay / 1000000, (int) (_delay % 1000000));
- }
- }
- }
+ /**
+ * Sleep normally takes micro-seconds this allows the use of a nano-second value.
+ *
+ * @param delay nanoseconds to sleep for.
+ */
+ private void sleepLong(long delay)
+ {
+ sleep(delay / 1000000, (int) (delay % 1000000));
}
+ /**
+ * Sleep for the specified micro-seconds.
+ * @param sleep microseconds to sleep for.
+ */
private void sleep(long sleep)
{
sleep(sleep, 0);
}
+ /**
+ * Perform the sleep , swallowing any InteruptException.
+ *
+ * NOTE: If a sleep request is > 10s then reset only sleep for 5s
+ *
+ * @param milli to sleep for
+ * @param nano sub miliseconds to sleep for
+ */
private void sleep(long milli, int nano)
{
try
{
- log.debug("Sleep:" + milli + ":" + nano);
+ debugLog.debug("Sleep:" + milli + ":" + nano);
+ if (milli > 10000)
+ {
+
+ if (_delay == milli)
+ {
+ _totalDuration = _totalReceived / _batchSize * EXPECTED_TIME_PER_BATCH;
+ debugLog.error("Sleeping for more than 10 seconds adjusted to 5s!:" + milli / 1000 + "s. Reset _totalDuration:" + _totalDuration);
+ }
+ else
+ {
+ debugLog.error("Sleeping for more than 10 seconds adjusted to 5s!:" + milli / 1000 + "s");
+ }
+
+ milli = 5000;
+ }
+
Thread.sleep(milli, nano);
}
catch (InterruptedException e)
@@ -583,6 +857,12 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
//
}
}
+
+ public void setClient(SustainedTestClient client)
+ {
+ _client = client;
+ }
}
}
+
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java b/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java
index 4081d87192..b437e165b4 100644
--- a/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java
+++ b/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java
@@ -78,11 +78,12 @@ public class SustainedTestCoordinator extends CoordinatingTestCase3BasicPubSub i
Map<String, Object> testConfig = new HashMap<String, Object>();
testConfig.put("TEST_NAME", "Perf_SustainedPubSub");
testConfig.put("SUSTAINED_KEY", SUSTAINED_KEY);
- //testConfig.put("SUSTAINED_MSG_RATE", 10);
- testConfig.put("SUSTAINED_NUM_RECEIVERS", 2);
- testConfig.put("SUSTAINED_UPDATE_INTERVAL", 25);
+ testConfig.put("SUSTAINED_NUM_RECEIVERS", Integer.getInteger("numReceives", 2));
+ testConfig.put("SUSTAINED_UPDATE_INTERVAL", Integer.getInteger("batchSize", 1000));
testConfig.put("SUSTAINED_UPDATE_KEY", SUSTAINED_KEY + ".UPDATE");
- testConfig.put("ACKNOWLEDGE_MODE", AMQSession.NO_ACKNOWLEDGE);
+ testConfig.put("ACKNOWLEDGE_MODE", Integer.getInteger("ackMode", AMQSession.AUTO_ACKNOWLEDGE));
+
+ log.info("Created Config: " + testConfig.entrySet().toArray());
sequenceTest(testConfig);
}