diff options
author | Martin Ritchie <ritchiem@apache.org> | 2007-06-27 15:34:57 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2007-06-27 15:34:57 +0000 |
commit | f10117cd6464a107b086e0b7f7ea44a496b04c3d (patch) | |
tree | 1101a6639f81979756473ddc1181c986e91f89ef /java/integrationtests | |
parent | d7c68d138a1151db2b0d133c94f8b1843850e867 (diff) | |
download | qpid-python-f10117cd6464a107b086e0b7f7ea44a496b04c3d.tar.gz |
Merged revisions 549530-550509 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2
........
r549530 | rupertlssmith | 2007-06-21 17:14:03 +0100 (Thu, 21 Jun 2007) | 1 line
Added minimal checkstyle to project reports. Fixed some problems with site generation.
........
r549849 | rupertlssmith | 2007-06-22 16:39:27 +0100 (Fri, 22 Jun 2007) | 1 line
Added Immediate and Mandatory message tests.
........
r550509 | ritchiem | 2007-06-25 15:16:30 +0100 (Mon, 25 Jun 2007) | 1 line
Update to provide a SustainedTestCase, this sends batches of messages to the broker. The rate of publication is regulated by the average consume rate advertised by all connected clients.
........
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@551199 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/integrationtests')
10 files changed, 516 insertions, 187 deletions
diff --git a/java/integrationtests/pom.xml b/java/integrationtests/pom.xml index 3afdf48204..9ccd153f54 100644 --- a/java/integrationtests/pom.xml +++ b/java/integrationtests/pom.xml @@ -31,6 +31,7 @@ <groupId>org.apache.qpid</groupId>
<artifactId>qpid</artifactId>
<version>1.0-incubating-M2-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
</parent>
<properties>
@@ -45,6 +46,12 @@ <artifactId>qpid-client</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>1.4.0</version>
+ </dependency>
+
<dependency>
<groupId>uk.co.thebadgerset</groupId>
<artifactId>junit-toolkit</artifactId>
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java index 31de84e630..d2042be741 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java @@ -1,4 +1,3 @@ -/* Copyright Rupert Smith, 2005 to 2006, all rights reserved. */
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -21,16 +20,16 @@ */
package org.apache.qpid.interop.coordinator;
-import java.util.Map;
-
-import javax.jms.*;
-
import junit.framework.TestCase;
import org.apache.log4j.Logger;
import org.apache.qpid.util.ConversationFactory;
+import javax.jms.*;
+
+import java.util.Map;
+
/**
* A CoordinatingTestCase is a JUnit test case extension that knows how to coordinate test clients that take part in a
* test case as defined in the interop testing specification
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/InteropClientTestCase.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/InteropClientTestCase.java index 9f769822de..37952d08c8 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/InteropClientTestCase.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/InteropClientTestCase.java @@ -79,12 +79,19 @@ public interface InteropClientTestCase extends MessageListener /**
* Performs the test case actions.
- *
+ * return from here when you have finished the test.. this will signal the controller that the test has ended.
* @throws JMSException Any JMSException resulting from reading the message are allowed to fall through.
*/
public void start() throws JMSException;
/**
+ * Gives notice of termination of the test case actions.
+ *
+ * @throws JMSException Any JMSException resulting from allowed to fall through.
+ */
+ public void terminate() throws JMSException, InterruptedException;
+
+ /**
* Gets a report on the actions performed by the test case in its assigned role.
*
* @param session The session to create the report message in.
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java index 6cca23446f..a82b05e20f 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java @@ -20,23 +20,31 @@ */
package org.apache.qpid.interop.testclient;
-import java.io.IOException;
-import java.util.*;
-
-import javax.jms.*;
-import javax.naming.Context;
-import javax.naming.InitialContext;
-import javax.naming.NamingException;
-
import org.apache.log4j.Logger;
-
import org.apache.qpid.interop.testclient.testcases.TestCase1DummyRun;
import org.apache.qpid.interop.testclient.testcases.TestCase2BasicP2P;
-import org.apache.qpid.interop.testclient.testcases.TestCase3BasicPubSub;
-import org.apache.qpid.util.ClasspathScanner;
import org.apache.qpid.util.CommandLineParser;
import org.apache.qpid.util.PropertiesUtils;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
/**
* Implements a test client as described in the interop testing spec
* (http://cwiki.apache.org/confluence/display/qpid/Interop+Testing+Specification). A test client is an agent that
@@ -201,7 +209,7 @@ public class TestClient implements MessageListener }
// Open a connection to communicate with the coordinator on.
- _connection = createConnection(DEFAULT_CONNECTION_PROPS_RESOURCE, brokerUrl, virtualHost);
+ _connection = createConnection(DEFAULT_CONNECTION_PROPS_RESOURCE, clientName, brokerUrl, virtualHost);
session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -219,17 +227,21 @@ public class TestClient implements MessageListener _connection.start();
}
+
+ public static Connection createConnection(String connectionPropsResource, String brokerUrl, String virtualHost)
+ {
+ return createConnection(connectionPropsResource, "clientID", brokerUrl, virtualHost);
+ }
+
/**
* Establishes a JMS connection using a properties file and qpids built in JNDI implementation. This is a simple
- * convenience method for code that does anticipate handling connection failures. All exceptions that indicate
- * that the connection has failed, are wrapped as rutime exceptions, preumably handled by a top level failure
- * handler.
- *
- * @todo Make username/password configurable. Allow multiple urls for fail over. Once it feels right, move it
- * to a Utils library class.
+ * convenience method for code that does anticipate handling connection failures. All exceptions that indicate that
+ * the connection has failed, are wrapped as rutime exceptions, preumably handled by a top level failure handler.
*
* @param connectionPropsResource The name of the connection properties file.
- * @param brokerUrl The broker url to connect to, <tt>null</tt> to use the default from the properties.
+ * @param clientID
+ * @param brokerUrl The broker url to connect to, <tt>null</tt> to use the default from the
+ * properties.
* @param virtualHost The virtual host to connectio to, <tt>null</tt> to use the default.
*
* @return A JMS conneciton.
@@ -237,7 +249,7 @@ public class TestClient implements MessageListener * @todo Make username/password configurable. Allow multiple urls for fail over. Once it feels right, move it to a
* Utils library class.
*/
- public static Connection createConnection(String connectionPropsResource, String brokerUrl, String virtualHost)
+ public static Connection createConnection(String connectionPropsResource, String clientID, String brokerUrl, String virtualHost)
{
log.debug("public static Connection createConnection(String connectionPropsResource = " + connectionPropsResource
+ ", String brokerUrl = " + brokerUrl + ", String virtualHost = " + virtualHost + "): called");
@@ -251,7 +263,7 @@ public class TestClient implements MessageListener if (brokerUrl != null)
{
String connectionString =
- "amqp://guest:guest/" + ((virtualHost != null) ? virtualHost : "") + "?brokerlist='" + brokerUrl + "'";
+ "amqp://guest:guest@" + clientID + "/" + ((virtualHost != null) ? virtualHost : "") + "?brokerlist='" + brokerUrl + "'";
connectionProps.setProperty(CONNECTION_PROPERTY, connectionString);
}
@@ -381,6 +393,14 @@ public class TestClient implements MessageListener {
log.info("Received termination instruction from coordinator.");
+// try
+// {
+// currentTestCase.terminate();
+// }
+// catch (InterruptedException e)
+// {
+// //
+// }
// Is a cleaner shutdown needed?
_connection.close();
System.exit(0);
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase1DummyRun.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase1DummyRun.java index 85b89172bb..5f257c0b36 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase1DummyRun.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase1DummyRun.java @@ -74,6 +74,11 @@ public class TestCase1DummyRun implements InteropClientTestCase // Do nothing.
}
+ public void terminate() throws JMSException
+ {
+ //todo
+ }
+
public Message getReport(Session session) throws JMSException
{
log.debug("public Message getReport(Session session): called");
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase2BasicP2P.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase2BasicP2P.java index ea62b46451..ff56ee9b93 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase2BasicP2P.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase2BasicP2P.java @@ -170,6 +170,11 @@ public class TestCase2BasicP2P implements InteropClientTestCase }
}
+ public void terminate() throws JMSException
+ {
+ //todo
+ }
+
/**
* Gets a report on the actions performed by the test case in its assigned role.
*
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java index 223c4916bf..7b35142c82 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java @@ -202,6 +202,11 @@ public class TestCase3BasicPubSub implements InteropClientTestCase }
}
+ public void terminate() throws JMSException, InterruptedException
+ {
+ //todo
+ }
+
/**
* Gets a report on the actions performed by the test case in its assigned role.
*
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java b/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java index cabe73e331..1597da6dba 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java @@ -38,6 +38,7 @@ import javax.jms.Session; import javax.jms.TextMessage; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CountDownLatch; /** * Implements test case 3, basic pub/sub. Sends/received a specified number of messages to a specified route on the @@ -52,7 +53,10 @@ import java.util.Map; public class SustainedTestClient extends TestCase3BasicPubSub implements ExceptionListener { /** Used for debugging. */ - private static final Logger log = Logger.getLogger(SustainedTestClient.class); + private static final Logger debugLog = Logger.getLogger(SustainedTestClient.class); + + private static final Logger log = Logger.getLogger("SustainedTest"); + /** The role to be played by the test. */ private Roles role; @@ -83,9 +87,11 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti SustainedRateAdapter _rateAdapter; /** */ - int updateInterval; + int _batchSize; + - private boolean _running = true; + private static final long TEN_MILLI_SEC = 10000000; + private static final long FIVE_MILLI_SEC = 5000000; /** * Should provide the name of the test case that this class implements. The exact names are defined in the interop @@ -95,7 +101,7 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti */ public String getName() { - log.debug("public String getName(): called"); + debugLog.debug("public String getName(): called"); return "Perf_SustainedPubSub"; } @@ -111,31 +117,34 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti */ public void assignRole(Roles role, Message assignRoleMessage) throws JMSException { - log.debug("public void assignRole(Roles role = " + role + ", Message assignRoleMessage = " + assignRoleMessage - + "): called"); + debugLog.debug("public void assignRole(Roles role = " + role + ", Message assignRoleMessage = " + assignRoleMessage + + "): called"); // Take note of the role to be played. this.role = role; // Extract and retain the test parameters. numReceivers = assignRoleMessage.getIntProperty("SUSTAINED_NUM_RECEIVERS"); - updateInterval = assignRoleMessage.getIntProperty("SUSTAINED_UPDATE_INTERVAL"); + _batchSize = assignRoleMessage.getIntProperty("SUSTAINED_UPDATE_INTERVAL"); String sendKey = assignRoleMessage.getStringProperty("SUSTAINED_KEY"); String sendUpdateKey = assignRoleMessage.getStringProperty("SUSTAINED_UPDATE_KEY"); int ackMode = assignRoleMessage.getIntProperty("ACKNOWLEDGE_MODE"); - log.debug("numReceivers = " + numReceivers); - log.debug("updateInterval = " + updateInterval); - log.debug("ackMode = " + ackMode); - log.debug("sendKey = " + sendKey); - log.debug("sendUpdateKey = " + sendUpdateKey); - log.debug("role = " + role); + if (debugLog.isDebugEnabled()) + { + debugLog.debug("numReceivers = " + numReceivers); + debugLog.debug("_batchSize = " + _batchSize); + debugLog.debug("ackMode = " + ackMode); + debugLog.debug("sendKey = " + sendKey); + debugLog.debug("sendUpdateKey = " + sendUpdateKey); + debugLog.debug("role = " + role); + } switch (role) { // Check if the sender role is being assigned, and set up a single message producer if so. case SENDER: - log.info("*********** Creating SENDER"); + log.info("Creating Sender"); // Create a new connection to pass the test messages on. connection = new Connection[1]; session = new Session[1]; @@ -164,7 +173,7 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti // Otherwise the receiver role is being assigned, so set this up to listen for messages on the required number // of receiver connections. case RECEIVER: - log.info("*********** Creating RECEIVER"); + log.info("Creating Receiver"); // Create the required number of receiver connections. connection = new Connection[numReceivers]; session = new Session[numReceivers]; @@ -183,7 +192,7 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti MessageConsumer consumer = session[i].createConsumer(sendDestination); - consumer.setMessageListener(new SustainedListener(TestClient.CLIENT_NAME + "-" + i, updateInterval, session[i], sendUpdateDestination)); + consumer.setMessageListener(new SustainedListener(TestClient.CLIENT_NAME + "-" + i, _batchSize, session[i], sendUpdateDestination)); } break; @@ -196,29 +205,32 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti } } + /** Performs the test case actions. */ public void start() throws JMSException { - log.debug("public void start(): called"); + debugLog.debug("public void start(): called"); // Check that the sender role is being performed. switch (role) { // Check if the sender role is being assigned, and set up a single message producer if so. case SENDER: - Message testMessage = session[0].createTextMessage("test"); - -// for (int i = 0; i < numMessages; i++) - while (_running) - { - producer.send(testMessage); - - _rateAdapter.sentMessage(); - } + _rateAdapter.run(); break; case RECEIVER: } + + //return from here when you have finished the test.. this will signal the controller and + } + + public void terminate() throws JMSException, InterruptedException + { + if (_rateAdapter != null) + { + _rateAdapter.stop(); + } } /** @@ -232,7 +244,7 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti */ public Message getReport(Session session) throws JMSException { - log.debug("public Message getReport(Session session): called"); + debugLog.debug("public Message getReport(Session session): called"); // Close the test connections. for (int i = 0; i < connection.length; i++) @@ -252,89 +264,100 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti if (linked != null) { - if (linked instanceof AMQNoRouteException) + if (debugLog.isDebugEnabled()) { - log.warn("No route ."); + debugLog.debug("Linked Exception:" + linked); } - else if (linked instanceof AMQNoConsumersException) - { - log.warn("No clients currently available for message:" + ((AMQNoConsumersException) linked).getUndeliveredMessage()); - } - else + if ((linked instanceof AMQNoRouteException) + || (linked instanceof AMQNoConsumersException)) { + if (debugLog.isDebugEnabled()) + { + if (linked instanceof AMQNoConsumersException) + { + debugLog.warn("No clients currently available for message:" + ((AMQNoConsumersException) linked).getUndeliveredMessage()); + } + else + { + debugLog.warn("No route for message"); + } + } - log.warn("LinkedException:" + linked); + // Tell the rate adapter that there are no clients ready yet + _rateAdapter.NO_CLIENTS = true; } - - _rateAdapter.NO_CLIENTS = true; } else { - log.warn("Exception:" + linked); + debugLog.warn("Exception:" + linked); } } + /** + * Inner class that listens for messages and sends a report for the time taken between receiving the 'start' and + * 'end' messages. + */ class SustainedListener implements MessageListener { - private int _received = 0; - private int _updateInterval = 0; - private Long _time; + /** Number of messages received */ + private long _received = 0; + /** The number of messages in the batch */ + private int _batchSize = 0; + /** Record of the when the 'start' messagse was sen */ + private Long _startTime; + /** Message producer to use to send reports */ MessageProducer _updater; + /** Session to create the report message on */ Session _session; + /** Record of the client ID used for this SustainedListnener */ String _client; - public SustainedListener(String clientname, int updateInterval, Session session, Destination sendDestination) throws JMSException + /** + * Main Constructor + * + * @param clientname The _client id used to identify this connection. + * @param batchSize The number of messages that are to be sent per batch. Note: This is not used to + * control the interval between sending reports. + * @param session The session used for communication. + * @param sendDestination The destination that update reports should be sent to. + * + * @throws JMSException My occur if creatingthe Producer fails + */ + public SustainedListener(String clientname, int batchSize, Session session, Destination sendDestination) throws JMSException { - _updateInterval = updateInterval; + _batchSize = batchSize; _client = clientname; _session = session; _updater = session.createProducer(sendDestination); } - public void setReportInterval(int reportInterval) - { - _updateInterval = reportInterval; - _received = 0; - } - public void onMessage(Message message) { - if (log.isDebugEnabled()) + if (debugLog.isTraceEnabled()) { - log.debug("Message " + _received + "received in listener"); + debugLog.trace("Message " + _received + "received in listener"); } + if (message instanceof TextMessage) { - try { - if (((TextMessage) message).getText().equals("test")) + _received++; + if (((TextMessage) message).getText().equals("start")) { - if (_received == 0) - { - _time = System.nanoTime(); - sendStatus(0, _received); - } - - _received++; - - if (_received % _updateInterval == 0) + debugLog.info("Starting Batch"); + _startTime = System.nanoTime(); + } + else if (((TextMessage) message).getText().equals("end")) + { + if (_startTime != null) { - Long currentTime = System.nanoTime(); - - try - { - sendStatus(currentTime - _time, _received); - _time = currentTime; - } - catch (JMSException e) - { - log.error("Unable to send update."); - } + long currentTime = System.nanoTime(); + sendStatus(currentTime - _startTime, _received); + debugLog.info("End Batch"); } - } } catch (JMSException e) @@ -342,37 +365,68 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti //ignore error } } + } - private void sendStatus(long time, int received) throws JMSException + /** + * sendStatus creates and sends the report back to the publisher + * + * @param time taken for the the last batch + * @param received Total number of messages received. + * + * @throws JMSException if an error occurs during the send + */ + private void sendStatus(long time, long received) throws JMSException { Message updateMessage = _session.createTextMessage("update"); - updateMessage.setStringProperty("CLIENT_ID", _client); + updateMessage.setStringProperty("CLIENT_ID", ":" + _client); updateMessage.setStringProperty("CONTROL_TYPE", "UPDATE"); updateMessage.setLongProperty("RECEIVED", received); updateMessage.setLongProperty("DURATION", time); - log.info("**** SENDING **** CLIENT_ID:" + _client + " RECEIVED:" + received + " DURATION:" + time); + if (debugLog.isInfoEnabled()) + { + debugLog.info("**** SENDING [" + received / _batchSize + "]**** " + + "CLIENT_ID:" + _client + " RECEIVED:" + received + " DURATION:" + time); + } + + // Output on the main log.info the details of this batch + if (received / _batchSize % 10 == 0) + { + log.info("Sending Report [" + received / _batchSize + "] " + + "CLIENT_ID:" + _client + " RECEIVED:" + received + " DURATION:" + time); + } _updater.send(updateMessage); } - } - class SustainedRateAdapter implements MessageListener + + /** + * This class is used here to adjust the _delay value which in turn is used to control the number of messages/second + * that are sent through the test system. + * + * By keeping a record of the messages recevied and the average time taken to process the batch size can be + * calculated and so the delay can be adjusted to maintain that rate. + * + * Given that delays of < 10ms can be rounded up the delay is only used between messages if the _delay > 10ms * no + * messages in the batch. Otherwise the delay is used at the end of the batch. + */ + class SustainedRateAdapter implements MessageListener, Runnable { private SustainedTestClient _client; - private long _variance = 250; //no. messages to allow drifting + private long _messageVariance = 500; //no. messages to allow drifting + private long _timeVariance = TEN_MILLI_SEC * 5; // no. nanos between send and report delay (10ms) private volatile long _delay; //in nanos private long _sent; private Map<String, Long> _slowClients = new HashMap<String, Long>(); - private static final long PAUSE_SLEEP = 10; // 10 ms - private static final long NO_CLIENT_SLEEP = 1000; // 1s - private static final long MAX_MESSAGE_DRIFT = 1000; // no messages drifted from producer + private static final long PAUSE_SLEEP = TEN_MILLI_SEC / 1000; // 10 ms + private static final long NO_CLIENT_SLEEP = 1000; // 1s private volatile boolean NO_CLIENTS = true; private int _delayShifting; - private static final int REPORTS_WITHOUT_CHANGE = 10; - private static final double MAXIMUM_DELAY_SHIFT = .02; //2% + private static final int REPORTS_WITHOUT_CHANGE = 5; + private boolean _warmedup = false; + private static final long EXPECTED_TIME_PER_BATCH = 100000L; SustainedRateAdapter(SustainedTestClient client) { @@ -381,9 +435,9 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti public void onMessage(Message message) { - if (log.isDebugEnabled()) + if (debugLog.isDebugEnabled()) { - log.debug("SustainedRateAdapter onMessage(Message message = " + message + "): called"); + debugLog.debug("SustainedRateAdapter onMessage(Message message = " + message + "): called"); } try @@ -395,15 +449,25 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti { NO_CLIENTS = false; long duration = message.getLongProperty("DURATION"); - long received = message.getLongProperty("RECEIVED"); + long totalReceived = message.getLongProperty("RECEIVED"); String client = message.getStringProperty("CLIENT_ID"); - log.info("**** SENDING **** CLIENT_ID:" + client + " RECEIVED:" + received + " DURATION:" + duration); + if (debugLog.isInfoEnabled()) + { + debugLog.info("Update Report: CLIENT_ID:" + client + " RECEIVED:" + totalReceived + " DURATION:" + duration); + } + + recordSlow(client, totalReceived); + adjustDelay(client, totalReceived, duration); - recordSlow(client, received); - adjustDelay(client, received, duration); + if (!_warmedup && _totalReceived / _batchSize / delays.size() == _warmUpBatches / 2) + { + _warmedup = true; + _warmup.countDown(); + + } } } catch (JMSException e) @@ -412,72 +476,220 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti } } - class Pair<X, Y> + CountDownLatch _warmup = new CountDownLatch(1); + + int _warmUpBatches = 20; + + int _numBatches = 10000; + + // long[] _timings = new long[_numBatches]; + private boolean _running = true; + + + public void run() + { + log.info("Warming up"); + + doBatch(_warmUpBatches); + + try + { + //wait for warmup to complete. + _warmup.await(); + + //set delay to the average length of the batches + _delay = _totalDuration / _warmUpBatches / delays.size(); + + log.info("Warmup complete delay set : " + _delay + + " based on _totalDuration: " + _totalDuration + + " over no. batches: " + _warmUpBatches + + " with client count: " + delays.size()); + + _totalDuration = 0L; + _totalReceived = 0L; + _sent = 0L; + } + catch (InterruptedException e) + { + // + } + + + doBatch(_numBatches); + + } + + private void doBatch(int batchSize) // long[] timings, { - X item1; - Y item2; + TextMessage testMessage = null; + try + { + testMessage = _client.session[0].createTextMessage("start"); + - Pair(X i1, Y i2) + for (int batch = 0; batch < batchSize; batch++) +// while (_running) + { + long start = System.nanoTime(); + + testMessage.setText("start"); + _client.producer.send(testMessage); + _rateAdapter.sentMessage(); + + testMessage.setText("test"); + //start at 2 so start and end count as part of batch + for (int m = 2; m < _batchSize; m++) + { + _client.producer.send(testMessage); + _rateAdapter.sentMessage(); + } + + testMessage.setText("end"); + _client.producer.send(testMessage); + _rateAdapter.sentMessage(); + + long end = System.nanoTime(); + + long sendtime = end - start; + + debugLog.info("Sent batch[" + batch + "](" + _batchSize + ") in " + sendtime);//timings[batch]); + + if (batch % 10 == 0) + { + log.info("Sent Batch[" + batch + "](" + _batchSize + ")" + status()); + } + + _rateAdapter.sleepBatch(); + + } + } + catch (JMSException e) { - item1 = i1; - item2 = i2; + log.error("Runner ended"); } + } - X getItem1() + private String status() + { + return " TotalDuration: " + _totalDuration + " for " + delays.size() + " consumers" + + " Delay is " + _delay + " resulting in " + + ((_delay > TEN_MILLI_SEC * _batchSize) ? (_delay / _batchSize) + "/msg" : _delay + "/batch"); + } + + private void sleepBatch() + { + if (checkForSlowClients()) + {//if there werwe slow clients we have already slept so don't sleep anymore again. + return; + } + + //Slow down if gap between send and received is too large + if (_sent - _totalReceived / delays.size() > _messageVariance) { - return item1; + //pause between batches. + debugLog.info("Sleeping to keep sent in check with received"); + log.debug("Increaseing _delay as sending more than receiving"); + _delay += TEN_MILLI_SEC; } - Y getItem2() + //per batch sleep.. if sleep is to small to spread over the batch. + if (_delay <= TEN_MILLI_SEC * _batchSize) { - return item2; + sleepLong(_delay); + } + else + { + debugLog.info("Not sleeping _delay > ten*batch is:" + _delay); } } - Map<String, Pair<Long, Long>> delays = new HashMap<String, Pair<Long, Long>>(); - Long totalReceived = 0L; - Long totalDuration = 0L; + public void stop() + { + _running = false; + } - private void adjustDelay(String client, long received, long duration) + Map<String, Long> delays = new HashMap<String, Long>(); + Long _totalReceived = 0L; + Long _totalDuration = 0L; + int _skipUpdate = 0; + + /** + * Adjust the delay for sending messages based on this update from the client + * + * @param client The client that send this update + * @param totalReceived The number of messages that this client has received. + * @param duration The time taken for the last batch of messagse + */ + private void adjustDelay(String client, long totalReceived, long duration) { - Pair<Long, Long> current = delays.get(client); + //Retrieve the current total time taken for this client. + Long currentTime = delays.get(client); - if (current == null) + // Add the new duration time to this client + if (currentTime == null) { - delays.put(client, new Pair<Long, Long>(received, duration)); + currentTime = duration; } else { - //reduce totals - totalReceived -= current.getItem1(); - totalDuration -= current.getItem2(); + currentTime += duration; } - totalReceived += received; - totalDuration += duration; + delays.put(client, currentTime); + + + _totalReceived += _batchSize; + _totalDuration += duration; - long averageDuration = totalDuration / delays.size(); + // Calculate the number of messages in the batch. + long batchCount = (_totalReceived / _batchSize); - long diff = Math.abs(_delay - averageDuration); + //calculate average duration accross clients per batch + long averageDuration = _totalDuration / delays.size() / batchCount; + + //calculate the difference between current send delay and average report delay + long diff = (duration) - averageDuration; + + if (debugLog.isInfoEnabled()) + { + debugLog.info("TotalDuration:" + _totalDuration + " for " + delays.size() + " consumers" + + " on batch: " + batchCount + + " Batch Duration: " + duration + + " Average: " + averageDuration + + " so diff: " + diff + " for : " + client + + " Delay is " + _delay + " resulting in " + + ((_delay > TEN_MILLI_SEC * _batchSize) + ? (_delay / _batchSize) + "/msg" : _delay + "/batch")); + } //if the averageDuration differs from the current by more than the specified variane then adjust delay. - if (diff > _variance) + if (Math.abs(diff) > _timeVariance) { - if (averageDuration > _delay) + + // if the the _delay is larger than the required duration to send report + // speed up + if (diff > TEN_MILLI_SEC) { - // we can go faster - _delay -= diff; + _delay -= TEN_MILLI_SEC; + if (_delay < 0) { _delay = 0; + debugLog.info("Reset _delay to 0"); + delayStable(); + } + else + { + delayChanged(); } + } - else + else if (diff < 0) // diff < 0 diff cannot be 0 as it is > _timeVariance { - // we need to slow down - _delay += diff; + // the report took longer + _delay += TEN_MILLI_SEC; + delayChanged(); } - delayChanged(); } else { @@ -486,11 +698,16 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti } + /** Reset the number of iterations before we say the delay has stabilised. */ private void delayChanged() { _delayShifting = REPORTS_WITHOUT_CHANGE; } + /** + * Record the fact that delay has stabilised If delay has stablised for REPORTS_WITHOUT_CHANGE then it will + * output Delay stabilised + */ private void delayStable() { _delayShifting--; @@ -498,14 +715,20 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti if (_delayShifting < 0) { _delayShifting = 0; - log.info("Delay stabilised:" + _delay); + log.debug("Delay stabilised:" + _delay); } } - // Record Slow clients + /** + * Checks that the client has received enough messages. If the client has fallen behind then they are put in the + * _slowClients lists which will increase the delay. + * + * @param client The client identifier to check + * @param received the number of messages received by that client + */ private void recordSlow(String client, long received) { - if (received < (_sent - _variance)) + if (received < (_sent - _messageVariance)) { _slowClients.put(client, received); } @@ -515,20 +738,49 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti } } + /** Incrment the number of sent messages and then sleep, if required. */ public void sentMessage() { - if (_sent % updateInterval == 0) + + _sent++; + + if (_delay > TEN_MILLI_SEC * _batchSize) { + long batchDelay = _delay / _batchSize; + // less than 10ms sleep doesn't always work. + // _delay is in nano seconds +// if (batchDelay < (TEN_MILLI_SEC)) +// { +// sleep(0, (int) batchDelay); +// } +// else + { +// if (batchDelay < 30000000000L) + { + sleepLong(batchDelay); + } + } + } + } + + /** + * Check at the end of each batch and pause sending messages to allow slow clients to catch up. + * + * @return true if there were slow clients that caught up. + */ + private boolean checkForSlowClients() + { + if (_sent % _batchSize == 0) + { // Cause test to pause when we have slow if (!_slowClients.isEmpty() || NO_CLIENTS) { - log.info("Pausing for slow clients"); - - //_delay <<= 1; + debugLog.info("Pausing for slow clients:" + _slowClients.entrySet().toArray()); while (!_slowClients.isEmpty()) { + debugLog.info(_slowClients.size() + " slow clients."); sleep(PAUSE_SLEEP); } @@ -537,45 +789,67 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti sleep(NO_CLIENT_SLEEP); } - log.debug("Continuing"); - return; + debugLog.debug("Continuing"); + return true; } else { - log.info("Delay:" + _delay); + debugLog.info("Delay:" + _delay); } + } - _sent++; + return false; + } - if (_delay > 0) - { - // less than 10ms sleep doesn't work. - // _delay is in nano seconds - if (_delay < 1000000) - { - sleep(0, (int) _delay); - } - else - { - if (_delay < 30000000000L) - { - sleep(_delay / 1000000, (int) (_delay % 1000000)); - } - } - } + /** + * Sleep normally takes micro-seconds this allows the use of a nano-second value. + * + * @param delay nanoseconds to sleep for. + */ + private void sleepLong(long delay) + { + sleep(delay / 1000000, (int) (delay % 1000000)); } + /** + * Sleep for the specified micro-seconds. + * @param sleep microseconds to sleep for. + */ private void sleep(long sleep) { sleep(sleep, 0); } + /** + * Perform the sleep , swallowing any InteruptException. + * + * NOTE: If a sleep request is > 10s then reset only sleep for 5s + * + * @param milli to sleep for + * @param nano sub miliseconds to sleep for + */ private void sleep(long milli, int nano) { try { - log.debug("Sleep:" + milli + ":" + nano); + debugLog.debug("Sleep:" + milli + ":" + nano); + if (milli > 10000) + { + + if (_delay == milli) + { + _totalDuration = _totalReceived / _batchSize * EXPECTED_TIME_PER_BATCH; + debugLog.error("Sleeping for more than 10 seconds adjusted to 5s!:" + milli / 1000 + "s. Reset _totalDuration:" + _totalDuration); + } + else + { + debugLog.error("Sleeping for more than 10 seconds adjusted to 5s!:" + milli / 1000 + "s"); + } + + milli = 5000; + } + Thread.sleep(milli, nano); } catch (InterruptedException e) @@ -583,6 +857,12 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti // } } + + public void setClient(SustainedTestClient client) + { + _client = client; + } } } + diff --git a/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java b/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java index 4081d87192..b437e165b4 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java @@ -78,11 +78,12 @@ public class SustainedTestCoordinator extends CoordinatingTestCase3BasicPubSub i Map<String, Object> testConfig = new HashMap<String, Object>(); testConfig.put("TEST_NAME", "Perf_SustainedPubSub"); testConfig.put("SUSTAINED_KEY", SUSTAINED_KEY); - //testConfig.put("SUSTAINED_MSG_RATE", 10); - testConfig.put("SUSTAINED_NUM_RECEIVERS", 2); - testConfig.put("SUSTAINED_UPDATE_INTERVAL", 25); + testConfig.put("SUSTAINED_NUM_RECEIVERS", Integer.getInteger("numReceives", 2)); + testConfig.put("SUSTAINED_UPDATE_INTERVAL", Integer.getInteger("batchSize", 1000)); testConfig.put("SUSTAINED_UPDATE_KEY", SUSTAINED_KEY + ".UPDATE"); - testConfig.put("ACKNOWLEDGE_MODE", AMQSession.NO_ACKNOWLEDGE); + testConfig.put("ACKNOWLEDGE_MODE", Integer.getInteger("ackMode", AMQSession.AUTO_ACKNOWLEDGE)); + + log.info("Created Config: " + testConfig.entrySet().toArray()); sequenceTest(testConfig); } diff --git a/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationFactory.java b/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationFactory.java index 4ca2fe8ff5..0090bec3d0 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationFactory.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationFactory.java @@ -20,16 +20,16 @@ */
package org.apache.qpid.util;
+import org.apache.log4j.Logger;
+
+import javax.jms.*;
+
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import javax.jms.*;
-
-import org.apache.log4j.Logger;
-
/**
* A conversation helper, uses a message correlation id pattern to match up sent and received messages as a conversation
* over JMS messaging. Incoming message traffic is divided up by correlation id. Each id has a queue (behaviour dependant
@@ -153,7 +153,7 @@ public class ConversationFactory * queue.
* @param queueClass The queue implementation class.
*
- * @throws JMSException All undelying JMSExceptions are allowed to fall through.
+ * @throws JMSException All underlying JMSExceptions are allowed to fall through.
*/
public ConversationFactory(Connection connection, Destination receiveDestination,
Class<? extends BlockingQueue> queueClass) throws JMSException
|