diff options
author | Martin Ritchie <ritchiem@apache.org> | 2007-06-27 15:49:51 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2007-06-27 15:49:51 +0000 |
commit | 9bac85e36d60df28fb6f86e3bbe9e3f46689aa04 (patch) | |
tree | 593c9d3d5cd46191099b558d0ef2d3ef012f968d /java/integrationtests | |
parent | f10117cd6464a107b086e0b7f7ea44a496b04c3d (diff) | |
download | qpid-python-9bac85e36d60df28fb6f86e3bbe9e3f46689aa04.tar.gz |
Merged revisions 550748-551121 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2
........
r550748 | ritchiem | 2007-06-26 10:20:17 +0100 (Tue, 26 Jun 2007) | 1 line
Added xml file for logging during sustained tests
........
r550773 | rupertlssmith | 2007-06-26 12:03:04 +0100 (Tue, 26 Jun 2007) | 1 line
Immediate and mandatory flag tests added.
........
r550849 | rupertlssmith | 2007-06-26 17:43:58 +0100 (Tue, 26 Jun 2007) | 1 line
QPID-509 Mandatory messages not returned outside a transaction. They are now.
........
r551117 | ritchiem | 2007-06-27 11:51:34 +0100 (Wed, 27 Jun 2007) | 2 lines
Update to the sustained test to ensure late joining occurs correctly and improved stabilisation. Additional system properties now documented on wiki.
http://cwiki.apache.org/qpid/sustained-tests.html
........
r551118 | ritchiem | 2007-06-27 11:51:51 +0100 (Wed, 27 Jun 2007) | 1 line
Added intelij files to ignore list
........
r551119 | ritchiem | 2007-06-27 11:55:34 +0100 (Wed, 27 Jun 2007) | 1 line
POM update to add Apache content to built jars
........
r551120 | ritchiem | 2007-06-27 11:58:25 +0100 (Wed, 27 Jun 2007) | 1 line
Updated default guest password as it was not correct.
........
r551121 | ritchiem | 2007-06-27 12:00:48 +0100 (Wed, 27 Jun 2007) | 1 line
Added additional information to log message when available to aid the explination of a failed connection
........
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@551207 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/integrationtests')
4 files changed, 192 insertions, 57 deletions
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java index a82b05e20f..a904bfa419 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java @@ -252,7 +252,8 @@ public class TestClient implements MessageListener public static Connection createConnection(String connectionPropsResource, String clientID, String brokerUrl, String virtualHost)
{
log.debug("public static Connection createConnection(String connectionPropsResource = " + connectionPropsResource
- + ", String brokerUrl = " + brokerUrl + ", String virtualHost = " + virtualHost + "): called");
+ + ", String brokerUrl = " + brokerUrl + ", String clientID = " + clientID
+ + ", String virtualHost = " + virtualHost + " ): called");
try
{
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java b/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java index 1597da6dba..6f2089290a 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java @@ -37,6 +37,7 @@ import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -91,7 +92,9 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti private static final long TEN_MILLI_SEC = 10000000; - private static final long FIVE_MILLI_SEC = 5000000; + private static final int DEBUG_LOG_UPATE_INTERVAL = 10; + private static final int LOG_UPATE_INTERVAL = 10; + private static final boolean SLEEP_PER_MESSAGE = Boolean.getBoolean("sleepPerMessage"); /** * Should provide the name of the test case that this class implements. The exact names are defined in the interop @@ -129,6 +132,7 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti String sendKey = assignRoleMessage.getStringProperty("SUSTAINED_KEY"); String sendUpdateKey = assignRoleMessage.getStringProperty("SUSTAINED_UPDATE_KEY"); int ackMode = assignRoleMessage.getIntProperty("ACKNOWLEDGE_MODE"); + String clientName = assignRoleMessage.getStringProperty("CLIENT_NAME"); if (debugLog.isDebugEnabled()) { @@ -150,7 +154,9 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti session = new Session[1]; connection[0] = - org.apache.qpid.interop.testclient.TestClient.createConnection(org.apache.qpid.interop.testclient.TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE, org.apache.qpid.interop.testclient.TestClient.brokerUrl, + org.apache.qpid.interop.testclient.TestClient.createConnection(org.apache.qpid.interop.testclient.TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE, + clientName, + org.apache.qpid.interop.testclient.TestClient.brokerUrl, org.apache.qpid.interop.testclient.TestClient.virtualHost); session[0] = connection[0].createSession(false, ackMode); @@ -182,6 +188,7 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti { connection[i] = org.apache.qpid.interop.testclient.TestClient.createConnection(org.apache.qpid.interop.testclient.TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE, + clientName, org.apache.qpid.interop.testclient.TestClient.brokerUrl, org.apache.qpid.interop.testclient.TestClient.virtualHost); session[i] = connection[i].createSession(false, ackMode); @@ -192,7 +199,7 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti MessageConsumer consumer = session[i].createConsumer(sendDestination); - consumer.setMessageListener(new SustainedListener(TestClient.CLIENT_NAME + "-" + i, _batchSize, session[i], sendUpdateDestination)); + consumer.setMessageListener(new SustainedListener(clientName + "-" + i, _batchSize, session[i], sendUpdateDestination)); } break; @@ -347,7 +354,7 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti _received++; if (((TextMessage) message).getText().equals("start")) { - debugLog.info("Starting Batch"); + debugLog.debug("Starting Batch"); _startTime = System.nanoTime(); } else if (((TextMessage) message).getText().equals("end")) @@ -355,8 +362,8 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti if (_startTime != null) { long currentTime = System.nanoTime(); - sendStatus(currentTime - _startTime, _received); - debugLog.info("End Batch"); + sendStatus(currentTime - _startTime, _received, message.getIntProperty("BATCH")); + debugLog.debug("End Batch"); } } } @@ -373,28 +380,31 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti * * @param time taken for the the last batch * @param received Total number of messages received. - * + * @param batchNumber the batch number * @throws JMSException if an error occurs during the send */ - private void sendStatus(long time, long received) throws JMSException + private void sendStatus(long time, long received, int batchNumber) throws JMSException { Message updateMessage = _session.createTextMessage("update"); updateMessage.setStringProperty("CLIENT_ID", ":" + _client); updateMessage.setStringProperty("CONTROL_TYPE", "UPDATE"); updateMessage.setLongProperty("RECEIVED", received); + updateMessage.setIntProperty("BATCH", batchNumber); updateMessage.setLongProperty("DURATION", time); if (debugLog.isInfoEnabled()) { - debugLog.info("**** SENDING [" + received / _batchSize + "]**** " - + "CLIENT_ID:" + _client + " RECEIVED:" + received + " DURATION:" + time); + debugLog.info("**** SENDING [" + batchNumber + "]**** " + + "CLIENT_ID:" + _client + " RECEIVED:" + received + + " BATCH:" + batchNumber + " DURATION:" + time); } // Output on the main log.info the details of this batch - if (received / _batchSize % 10 == 0) + if (batchNumber % 10 == 0) { - log.info("Sending Report [" + received / _batchSize + "] " - + "CLIENT_ID:" + _client + " RECEIVED:" + received + " DURATION:" + time); + log.info("Sending Report [" + batchNumber + "] " + + "CLIENT_ID:" + _client + " RECEIVED:" + received + + " BATCH:" + batchNumber + " DURATION:" + time); } _updater.send(updateMessage); @@ -415,7 +425,7 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti class SustainedRateAdapter implements MessageListener, Runnable { private SustainedTestClient _client; - private long _messageVariance = 500; //no. messages to allow drifting + private long _batchVariance = 3; //no. batches to allow drifting private long _timeVariance = TEN_MILLI_SEC * 5; // no. nanos between send and report delay (10ms) private volatile long _delay; //in nanos private long _sent; @@ -451,18 +461,23 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti long duration = message.getLongProperty("DURATION"); long totalReceived = message.getLongProperty("RECEIVED"); String client = message.getStringProperty("CLIENT_ID"); + int batchNumber = message.getIntProperty("BATCH"); - if (debugLog.isInfoEnabled()) + if (debugLog.isInfoEnabled() && batchNumber % DEBUG_LOG_UPATE_INTERVAL == 0) { - debugLog.info("Update Report: CLIENT_ID:" + client + " RECEIVED:" + totalReceived + " DURATION:" + duration); + debugLog.info("Update Report: CLIENT_ID:" + client + " RECEIVED:" + totalReceived + + " Recevied BATCH:" + batchNumber + " DURATION:" + duration); } - recordSlow(client, totalReceived); - - adjustDelay(client, totalReceived, duration); + recordSlow(client, totalReceived, batchNumber); + adjustDelay(client, batchNumber, duration); - if (!_warmedup && _totalReceived / _batchSize / delays.size() == _warmUpBatches / 2) + // Warm up completes when: + // we haven't warmed up + // and the number of batches sent to each client is at least half of the required warmup batches + if (!_warmedup + && (batchNumber >= _warmUpBatches)) { _warmedup = true; _warmup.countDown(); @@ -478,7 +493,7 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti CountDownLatch _warmup = new CountDownLatch(1); - int _warmUpBatches = 20; + int _warmUpBatches = Integer.getInteger("warmUpBatches", 10); int _numBatches = 10000; @@ -527,12 +542,14 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti testMessage = _client.session[0].createTextMessage("start"); - for (int batch = 0; batch < batchSize; batch++) + for (int batch = 0; batch <= batchSize; batch++) // while (_running) { long start = System.nanoTime(); testMessage.setText("start"); + testMessage.setIntProperty("BATCH", batch); + _client.producer.send(testMessage); _rateAdapter.sentMessage(); @@ -552,9 +569,12 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti long sendtime = end - start; - debugLog.info("Sent batch[" + batch + "](" + _batchSize + ") in " + sendtime);//timings[batch]); + if (debugLog.isDebugEnabled()) + { + debugLog.info("Sent batch[" + batch + "](" + _batchSize + ") in " + sendtime);//timings[batch]); + } - if (batch % 10 == 0) + if (batch % LOG_UPATE_INTERVAL == 0) { log.info("Sent Batch[" + batch + "](" + _batchSize + ")" + status()); } @@ -583,23 +603,17 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti return; } - //Slow down if gap between send and received is too large - if (_sent - _totalReceived / delays.size() > _messageVariance) - { - //pause between batches. - debugLog.info("Sleeping to keep sent in check with received"); - log.debug("Increaseing _delay as sending more than receiving"); - _delay += TEN_MILLI_SEC; - } - - //per batch sleep.. if sleep is to small to spread over the batch. - if (_delay <= TEN_MILLI_SEC * _batchSize) - { - sleepLong(_delay); - } - else + if (!SLEEP_PER_MESSAGE) { - debugLog.info("Not sleeping _delay > ten*batch is:" + _delay); + //per batch sleep.. if sleep is to small to spread over the batch. + if (_delay <= TEN_MILLI_SEC * _batchSize) + { + sleepLong(_delay); + } + else + { + debugLog.info("Not sleeping _delay > ten*batch is:" + _delay); + } } } @@ -617,10 +631,10 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti * Adjust the delay for sending messages based on this update from the client * * @param client The client that send this update - * @param totalReceived The number of messages that this client has received. * @param duration The time taken for the last batch of messagse + * @param batchNumber The reported batchnumber from the client */ - private void adjustDelay(String client, long totalReceived, long duration) + private void adjustDelay(String client, int batchNumber, long duration) { //Retrieve the current total time taken for this client. Long currentTime = delays.get(client); @@ -637,23 +651,28 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti delays.put(client, currentTime); + long batchesSent = _sent / _batchSize; + + // ensure we don't divide by zero + if (batchesSent == 0) + { + batchesSent = 1L; + } _totalReceived += _batchSize; _totalDuration += duration; - // Calculate the number of messages in the batch. - long batchCount = (_totalReceived / _batchSize); - //calculate average duration accross clients per batch - long averageDuration = _totalDuration / delays.size() / batchCount; + long averageDuration = _totalDuration / delays.size() / batchesSent; //calculate the difference between current send delay and average report delay long diff = (duration) - averageDuration; - if (debugLog.isInfoEnabled()) + if (debugLog.isInfoEnabled() && batchNumber % DEBUG_LOG_UPATE_INTERVAL == 0) { - debugLog.info("TotalDuration:" + _totalDuration + " for " + delays.size() + " consumers" - + " on batch: " + batchCount + debugLog.info("TotalDuration:" + _totalDuration + " for " + delays.size() + " consumers." + + " on batch: " + batchesSent + + " received batch: " + batchNumber + " Batch Duration: " + duration + " Average: " + averageDuration + " so diff: " + diff + " for : " + client @@ -696,6 +715,16 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti delayStable(); } + // If we have a consumer that is behind with the batches. + if (batchesSent - batchNumber > _batchVariance) + { + debugLog.debug("Increasing _delay as sending more than receiving"); + + _delay += 2 * TEN_MILLI_SEC; + delayChanged(); + } + + } /** Reset the number of iterations before we say the delay has stabilised. */ @@ -725,10 +754,11 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti * * @param client The client identifier to check * @param received the number of messages received by that client + * @param batchNumber */ - private void recordSlow(String client, long received) + private void recordSlow(String client, long received, int batchNumber) { - if (received < (_sent - _messageVariance)) + if (Math.abs(batchNumber - (_sent / _batchSize)) > _batchVariance) { _slowClients.put(client, received); } @@ -761,6 +791,13 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti } } } + else + { + if (SLEEP_PER_MESSAGE && (_delay > 0)) + { + sleepLong(_delay / _batchSize); + } + } } @@ -771,16 +808,38 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti */ private boolean checkForSlowClients() { - if (_sent % _batchSize == 0) + // This will allways be true as we are running this at the end of each batchSize +// if (_sent % _batchSize == 0) { // Cause test to pause when we have slow if (!_slowClients.isEmpty() || NO_CLIENTS) { - debugLog.info("Pausing for slow clients:" + _slowClients.entrySet().toArray()); + while (!_slowClients.isEmpty()) { - debugLog.info(_slowClients.size() + " slow clients."); + if (debugLog.isInfoEnabled() + && _sent / _batchSize % DEBUG_LOG_UPATE_INTERVAL == 0) + { + String clients = ""; + Iterator it = _slowClients.keySet().iterator(); + while (it.hasNext()) + { + clients += it.next(); + if (it.hasNext()) + { + clients += ", "; + } + } + debugLog.info("Pausing for slow clients:" + clients); + } + + + if (log.isDebugEnabled() + && _sent / _batchSize % LOG_UPATE_INTERVAL == 0) + { + log.debug(_slowClients.size() + " slow clients."); + } sleep(PAUSE_SLEEP); } @@ -794,7 +853,11 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti } else { - debugLog.info("Delay:" + _delay); + if (_sent / _batchSize % LOG_UPATE_INTERVAL == 0) + { + log.info("Total Delay :" + _delay + " " + + (_delayShifting == 0 ? "Stablised" : "Not Stablised(" + _delayShifting + ")")); + } } } @@ -825,7 +888,7 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti * Perform the sleep , swallowing any InteruptException. * * NOTE: If a sleep request is > 10s then reset only sleep for 5s - * + * * @param milli to sleep for * @param nano sub miliseconds to sleep for */ diff --git a/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java b/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java index b437e165b4..0075e45a8c 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java @@ -113,6 +113,7 @@ public class SustainedTestCoordinator extends CoordinatingTestCase3BasicPubSub i setPropertiesOnMessage(assignSender, testProperties); assignSender.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE"); assignSender.setStringProperty("ROLE", "SENDER"); + assignSender.setStringProperty("CLIENT_NAME", "Sustained_SENDER"); senderConversation.send(senderControlTopic, assignSender); @@ -170,6 +171,7 @@ public class SustainedTestCoordinator extends CoordinatingTestCase3BasicPubSub i setPropertiesOnMessage(assignReceiver, _testProperties); assignReceiver.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE"); assignReceiver.setStringProperty("ROLE", "RECEIVER"); + assignReceiver.setStringProperty("CLIENT_NAME", "Sustained_RECEIVER_" + receiver.clientName); receiverConversation.send(receiverControlTopic, assignReceiver); diff --git a/java/integrationtests/src/resources/sustained-log4j.xml b/java/integrationtests/src/resources/sustained-log4j.xml new file mode 100644 index 0000000000..c5ab3137bf --- /dev/null +++ b/java/integrationtests/src/resources/sustained-log4j.xml @@ -0,0 +1,69 @@ +<?xml version="1.0"?> +<!-- + - + - Licensed to the Apache Software Foundation (ASF) under one + - or more contributor license agreements. See the NOTICE file + - distributed with this work for additional information + - regarding copyright ownership. The ASF licenses this file + - to you under the Apache License, Version 2.0 (the + - "License"); you may not use this file except in compliance + - with the License. You may obtain a copy of the License at + - + - http://www.apache.org/licenses/LICENSE-2.0 + - + - Unless required by applicable law or agreed to in writing, + - software distributed under the License is distributed on an + - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + - KIND, either express or implied. See the License for the + - specific language governing permissions and limitations + - under the License. + - + --> +<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd"> +<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/"> + + + <appender name="FileAppender" class="org.apache.log4j.FileAppender"> + <param name="File" value="${QPID_WORK}/log/${logprefix}qpid${logsuffix}.log"/> + <param name="Append" value="false"/> + + <layout class="org.apache.log4j.PatternLayout"> + <param name="ConversionPattern" value="%d %-5p [%t] %C{2} (%F:%L) - %m%n"/> + </layout> + </appender> + + <appender name="STDOUT" class="org.apache.log4j.ConsoleAppender"> + + <layout class="org.apache.log4j.PatternLayout"> + <param name="ConversionPattern" value="%d %-5p (%F:%L) - %m%n"/> + <!--param name="ConversionPattern" value="%d %-5p [%t] %C{2} (%F:%L) - %m%n"/--> + </layout> + </appender> + + <category name="SustainedTest"> + <priority value="${sustained.level}"/> + </category> + + <category name="org.apache"> + <priority value="warn"/> + </category> + + <category name="org.apache.qpid.interop"> + <priority value="${interop.logging.level}"/> + </category> + + + <category name="org.apache.qpid.sustained"> + <priority value="${amqj.logging.level}"/> + </category> + + <!--category name="org.apache.qpid.server.txn"> + <priority value="debug"/> + </category>--> + + <root> + <priority value="all"/> + <appender-ref ref="STDOUT"/> + <!--appender-ref ref="ArchivingFileAppender"/--> + </root> +</log4j:configuration> |