diff options
author | Martin Ritchie <ritchiem@apache.org> | 2007-06-27 10:51:34 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2007-06-27 10:51:34 +0000 |
commit | 0b462f3d4fe0b4703e19d9ff7b35343f8caa1de6 (patch) | |
tree | ff9abef50d9aee8085aeeae6de12c2390ce18009 | |
parent | d2a9e42e20edbfd0db53c75a4f0511547ec70319 (diff) | |
download | qpid-python-0b462f3d4fe0b4703e19d9ff7b35343f8caa1de6.tar.gz |
Update to the sustained test to ensure late joining occurs correctly and improved stabilisation. Additional system properties now documented on wiki.
http://cwiki.apache.org/qpid/sustained-tests.html
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@551117 13f79535-47bb-0310-9956-ffa450edef68
4 files changed, 128 insertions, 61 deletions
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java index a82b05e20f..a904bfa419 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java @@ -252,7 +252,8 @@ public class TestClient implements MessageListener public static Connection createConnection(String connectionPropsResource, String clientID, String brokerUrl, String virtualHost)
{
log.debug("public static Connection createConnection(String connectionPropsResource = " + connectionPropsResource
- + ", String brokerUrl = " + brokerUrl + ", String virtualHost = " + virtualHost + "): called");
+ + ", String brokerUrl = " + brokerUrl + ", String clientID = " + clientID
+ + ", String virtualHost = " + virtualHost + " ): called");
try
{
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java b/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java index 1597da6dba..6f2089290a 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java @@ -37,6 +37,7 @@ import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -91,7 +92,9 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti private static final long TEN_MILLI_SEC = 10000000; - private static final long FIVE_MILLI_SEC = 5000000; + private static final int DEBUG_LOG_UPATE_INTERVAL = 10; + private static final int LOG_UPATE_INTERVAL = 10; + private static final boolean SLEEP_PER_MESSAGE = Boolean.getBoolean("sleepPerMessage"); /** * Should provide the name of the test case that this class implements. The exact names are defined in the interop @@ -129,6 +132,7 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti String sendKey = assignRoleMessage.getStringProperty("SUSTAINED_KEY"); String sendUpdateKey = assignRoleMessage.getStringProperty("SUSTAINED_UPDATE_KEY"); int ackMode = assignRoleMessage.getIntProperty("ACKNOWLEDGE_MODE"); + String clientName = assignRoleMessage.getStringProperty("CLIENT_NAME"); if (debugLog.isDebugEnabled()) { @@ -150,7 +154,9 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti session = new Session[1]; connection[0] = - org.apache.qpid.interop.testclient.TestClient.createConnection(org.apache.qpid.interop.testclient.TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE, org.apache.qpid.interop.testclient.TestClient.brokerUrl, + org.apache.qpid.interop.testclient.TestClient.createConnection(org.apache.qpid.interop.testclient.TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE, + clientName, + org.apache.qpid.interop.testclient.TestClient.brokerUrl, org.apache.qpid.interop.testclient.TestClient.virtualHost); session[0] = connection[0].createSession(false, ackMode); @@ -182,6 +188,7 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti { connection[i] = org.apache.qpid.interop.testclient.TestClient.createConnection(org.apache.qpid.interop.testclient.TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE, + clientName, org.apache.qpid.interop.testclient.TestClient.brokerUrl, org.apache.qpid.interop.testclient.TestClient.virtualHost); session[i] = connection[i].createSession(false, ackMode); @@ -192,7 +199,7 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti MessageConsumer consumer = session[i].createConsumer(sendDestination); - consumer.setMessageListener(new SustainedListener(TestClient.CLIENT_NAME + "-" + i, _batchSize, session[i], sendUpdateDestination)); + consumer.setMessageListener(new SustainedListener(clientName + "-" + i, _batchSize, session[i], sendUpdateDestination)); } break; @@ -347,7 +354,7 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti _received++; if (((TextMessage) message).getText().equals("start")) { - debugLog.info("Starting Batch"); + debugLog.debug("Starting Batch"); _startTime = System.nanoTime(); } else if (((TextMessage) message).getText().equals("end")) @@ -355,8 +362,8 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti if (_startTime != null) { long currentTime = System.nanoTime(); - sendStatus(currentTime - _startTime, _received); - debugLog.info("End Batch"); + sendStatus(currentTime - _startTime, _received, message.getIntProperty("BATCH")); + debugLog.debug("End Batch"); } } } @@ -373,28 +380,31 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti * * @param time taken for the the last batch * @param received Total number of messages received. - * + * @param batchNumber the batch number * @throws JMSException if an error occurs during the send */ - private void sendStatus(long time, long received) throws JMSException + private void sendStatus(long time, long received, int batchNumber) throws JMSException { Message updateMessage = _session.createTextMessage("update"); updateMessage.setStringProperty("CLIENT_ID", ":" + _client); updateMessage.setStringProperty("CONTROL_TYPE", "UPDATE"); updateMessage.setLongProperty("RECEIVED", received); + updateMessage.setIntProperty("BATCH", batchNumber); updateMessage.setLongProperty("DURATION", time); if (debugLog.isInfoEnabled()) { - debugLog.info("**** SENDING [" + received / _batchSize + "]**** " - + "CLIENT_ID:" + _client + " RECEIVED:" + received + " DURATION:" + time); + debugLog.info("**** SENDING [" + batchNumber + "]**** " + + "CLIENT_ID:" + _client + " RECEIVED:" + received + + " BATCH:" + batchNumber + " DURATION:" + time); } // Output on the main log.info the details of this batch - if (received / _batchSize % 10 == 0) + if (batchNumber % 10 == 0) { - log.info("Sending Report [" + received / _batchSize + "] " - + "CLIENT_ID:" + _client + " RECEIVED:" + received + " DURATION:" + time); + log.info("Sending Report [" + batchNumber + "] " + + "CLIENT_ID:" + _client + " RECEIVED:" + received + + " BATCH:" + batchNumber + " DURATION:" + time); } _updater.send(updateMessage); @@ -415,7 +425,7 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti class SustainedRateAdapter implements MessageListener, Runnable { private SustainedTestClient _client; - private long _messageVariance = 500; //no. messages to allow drifting + private long _batchVariance = 3; //no. batches to allow drifting private long _timeVariance = TEN_MILLI_SEC * 5; // no. nanos between send and report delay (10ms) private volatile long _delay; //in nanos private long _sent; @@ -451,18 +461,23 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti long duration = message.getLongProperty("DURATION"); long totalReceived = message.getLongProperty("RECEIVED"); String client = message.getStringProperty("CLIENT_ID"); + int batchNumber = message.getIntProperty("BATCH"); - if (debugLog.isInfoEnabled()) + if (debugLog.isInfoEnabled() && batchNumber % DEBUG_LOG_UPATE_INTERVAL == 0) { - debugLog.info("Update Report: CLIENT_ID:" + client + " RECEIVED:" + totalReceived + " DURATION:" + duration); + debugLog.info("Update Report: CLIENT_ID:" + client + " RECEIVED:" + totalReceived + + " Recevied BATCH:" + batchNumber + " DURATION:" + duration); } - recordSlow(client, totalReceived); - - adjustDelay(client, totalReceived, duration); + recordSlow(client, totalReceived, batchNumber); + adjustDelay(client, batchNumber, duration); - if (!_warmedup && _totalReceived / _batchSize / delays.size() == _warmUpBatches / 2) + // Warm up completes when: + // we haven't warmed up + // and the number of batches sent to each client is at least half of the required warmup batches + if (!_warmedup + && (batchNumber >= _warmUpBatches)) { _warmedup = true; _warmup.countDown(); @@ -478,7 +493,7 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti CountDownLatch _warmup = new CountDownLatch(1); - int _warmUpBatches = 20; + int _warmUpBatches = Integer.getInteger("warmUpBatches", 10); int _numBatches = 10000; @@ -527,12 +542,14 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti testMessage = _client.session[0].createTextMessage("start"); - for (int batch = 0; batch < batchSize; batch++) + for (int batch = 0; batch <= batchSize; batch++) // while (_running) { long start = System.nanoTime(); testMessage.setText("start"); + testMessage.setIntProperty("BATCH", batch); + _client.producer.send(testMessage); _rateAdapter.sentMessage(); @@ -552,9 +569,12 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti long sendtime = end - start; - debugLog.info("Sent batch[" + batch + "](" + _batchSize + ") in " + sendtime);//timings[batch]); + if (debugLog.isDebugEnabled()) + { + debugLog.info("Sent batch[" + batch + "](" + _batchSize + ") in " + sendtime);//timings[batch]); + } - if (batch % 10 == 0) + if (batch % LOG_UPATE_INTERVAL == 0) { log.info("Sent Batch[" + batch + "](" + _batchSize + ")" + status()); } @@ -583,23 +603,17 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti return; } - //Slow down if gap between send and received is too large - if (_sent - _totalReceived / delays.size() > _messageVariance) - { - //pause between batches. - debugLog.info("Sleeping to keep sent in check with received"); - log.debug("Increaseing _delay as sending more than receiving"); - _delay += TEN_MILLI_SEC; - } - - //per batch sleep.. if sleep is to small to spread over the batch. - if (_delay <= TEN_MILLI_SEC * _batchSize) - { - sleepLong(_delay); - } - else + if (!SLEEP_PER_MESSAGE) { - debugLog.info("Not sleeping _delay > ten*batch is:" + _delay); + //per batch sleep.. if sleep is to small to spread over the batch. + if (_delay <= TEN_MILLI_SEC * _batchSize) + { + sleepLong(_delay); + } + else + { + debugLog.info("Not sleeping _delay > ten*batch is:" + _delay); + } } } @@ -617,10 +631,10 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti * Adjust the delay for sending messages based on this update from the client * * @param client The client that send this update - * @param totalReceived The number of messages that this client has received. * @param duration The time taken for the last batch of messagse + * @param batchNumber The reported batchnumber from the client */ - private void adjustDelay(String client, long totalReceived, long duration) + private void adjustDelay(String client, int batchNumber, long duration) { //Retrieve the current total time taken for this client. Long currentTime = delays.get(client); @@ -637,23 +651,28 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti delays.put(client, currentTime); + long batchesSent = _sent / _batchSize; + + // ensure we don't divide by zero + if (batchesSent == 0) + { + batchesSent = 1L; + } _totalReceived += _batchSize; _totalDuration += duration; - // Calculate the number of messages in the batch. - long batchCount = (_totalReceived / _batchSize); - //calculate average duration accross clients per batch - long averageDuration = _totalDuration / delays.size() / batchCount; + long averageDuration = _totalDuration / delays.size() / batchesSent; //calculate the difference between current send delay and average report delay long diff = (duration) - averageDuration; - if (debugLog.isInfoEnabled()) + if (debugLog.isInfoEnabled() && batchNumber % DEBUG_LOG_UPATE_INTERVAL == 0) { - debugLog.info("TotalDuration:" + _totalDuration + " for " + delays.size() + " consumers" - + " on batch: " + batchCount + debugLog.info("TotalDuration:" + _totalDuration + " for " + delays.size() + " consumers." + + " on batch: " + batchesSent + + " received batch: " + batchNumber + " Batch Duration: " + duration + " Average: " + averageDuration + " so diff: " + diff + " for : " + client @@ -696,6 +715,16 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti delayStable(); } + // If we have a consumer that is behind with the batches. + if (batchesSent - batchNumber > _batchVariance) + { + debugLog.debug("Increasing _delay as sending more than receiving"); + + _delay += 2 * TEN_MILLI_SEC; + delayChanged(); + } + + } /** Reset the number of iterations before we say the delay has stabilised. */ @@ -725,10 +754,11 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti * * @param client The client identifier to check * @param received the number of messages received by that client + * @param batchNumber */ - private void recordSlow(String client, long received) + private void recordSlow(String client, long received, int batchNumber) { - if (received < (_sent - _messageVariance)) + if (Math.abs(batchNumber - (_sent / _batchSize)) > _batchVariance) { _slowClients.put(client, received); } @@ -761,6 +791,13 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti } } } + else + { + if (SLEEP_PER_MESSAGE && (_delay > 0)) + { + sleepLong(_delay / _batchSize); + } + } } @@ -771,16 +808,38 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti */ private boolean checkForSlowClients() { - if (_sent % _batchSize == 0) + // This will allways be true as we are running this at the end of each batchSize +// if (_sent % _batchSize == 0) { // Cause test to pause when we have slow if (!_slowClients.isEmpty() || NO_CLIENTS) { - debugLog.info("Pausing for slow clients:" + _slowClients.entrySet().toArray()); + while (!_slowClients.isEmpty()) { - debugLog.info(_slowClients.size() + " slow clients."); + if (debugLog.isInfoEnabled() + && _sent / _batchSize % DEBUG_LOG_UPATE_INTERVAL == 0) + { + String clients = ""; + Iterator it = _slowClients.keySet().iterator(); + while (it.hasNext()) + { + clients += it.next(); + if (it.hasNext()) + { + clients += ", "; + } + } + debugLog.info("Pausing for slow clients:" + clients); + } + + + if (log.isDebugEnabled() + && _sent / _batchSize % LOG_UPATE_INTERVAL == 0) + { + log.debug(_slowClients.size() + " slow clients."); + } sleep(PAUSE_SLEEP); } @@ -794,7 +853,11 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti } else { - debugLog.info("Delay:" + _delay); + if (_sent / _batchSize % LOG_UPATE_INTERVAL == 0) + { + log.info("Total Delay :" + _delay + " " + + (_delayShifting == 0 ? "Stablised" : "Not Stablised(" + _delayShifting + ")")); + } } } @@ -825,7 +888,7 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti * Perform the sleep , swallowing any InteruptException. * * NOTE: If a sleep request is > 10s then reset only sleep for 5s - * + * * @param milli to sleep for * @param nano sub miliseconds to sleep for */ diff --git a/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java b/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java index b437e165b4..0075e45a8c 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java @@ -113,6 +113,7 @@ public class SustainedTestCoordinator extends CoordinatingTestCase3BasicPubSub i setPropertiesOnMessage(assignSender, testProperties); assignSender.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE"); assignSender.setStringProperty("ROLE", "SENDER"); + assignSender.setStringProperty("CLIENT_NAME", "Sustained_SENDER"); senderConversation.send(senderControlTopic, assignSender); @@ -170,6 +171,7 @@ public class SustainedTestCoordinator extends CoordinatingTestCase3BasicPubSub i setPropertiesOnMessage(assignReceiver, _testProperties); assignReceiver.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE"); assignReceiver.setStringProperty("ROLE", "RECEIVER"); + assignReceiver.setStringProperty("CLIENT_NAME", "Sustained_RECEIVER_" + receiver.clientName); receiverConversation.send(receiverControlTopic, assignReceiver); diff --git a/java/integrationtests/src/resources/sustained-log4j.xml b/java/integrationtests/src/resources/sustained-log4j.xml index f2c280e180..c5ab3137bf 100644 --- a/java/integrationtests/src/resources/sustained-log4j.xml +++ b/java/integrationtests/src/resources/sustained-log4j.xml @@ -35,11 +35,12 @@ <appender name="STDOUT" class="org.apache.log4j.ConsoleAppender"> <layout class="org.apache.log4j.PatternLayout"> - <param name="ConversionPattern" value="%d %-5p [%t] %C{2} (%F:%L) - %m%n"/> + <param name="ConversionPattern" value="%d %-5p (%F:%L) - %m%n"/> + <!--param name="ConversionPattern" value="%d %-5p [%t] %C{2} (%F:%L) - %m%n"/--> </layout> </appender> - <category name="SustainedTest"> + <category name="SustainedTest"> <priority value="${sustained.level}"/> </category> @@ -48,8 +49,8 @@ </category> <category name="org.apache.qpid.interop"> - <priority value="${interop.logging.level}"/> - </category> + <priority value="${interop.logging.level}"/> + </category> <category name="org.apache.qpid.sustained"> |